28 #include <gperftools/heap-profiler.h>
29 #endif // HAVE_PROFILER
31 #include "MapDRelease.h"
34 #include "gen-cpp/CalciteServer.h"
68 #ifdef HAVE_RUNTIME_LIBS
84 #include <aws/core/auth/AWSCredentialsProviderChain.h>
88 #include <sys/types.h>
90 #include <boost/algorithm/string.hpp>
91 #include <boost/filesystem.hpp>
92 #include <boost/make_shared.hpp>
93 #include <boost/process/search_path.hpp>
94 #include <boost/program_options.hpp>
95 #include <boost/tokenizer.hpp>
108 #include <arrow/api.h>
109 #include <arrow/io/api.h>
110 #include <arrow/ipc/api.h>
115 #ifdef ENABLE_IMPORT_PARQUET
116 extern bool g_enable_parquet_import_fsi;
131 #define INVALID_SESSION_ID ""
133 #define SET_REQUEST_ID(parent_request_id) \
134 if (g_uniform_request_ids_per_thrift_call && parent_request_id) \
135 logger::set_request_id(parent_request_id); \
136 else if (logger::set_new_request_id(); parent_request_id) \
137 LOG(INFO) << "This request has parent request_id(" << parent_request_id << ')'
139 #define THROW_DB_EXCEPTION(errstr) \
142 ex.error_msg = errstr; \
143 LOG(ERROR) << ex.error_msg; \
153 const int32_t user_id,
154 const std::string& dashboard_name) {
166 extern std::unique_ptr<std::string> g_libgeos_so_filename;
170 const std::vector<LeafHostInfo>& string_leaves,
171 const std::string& base_data_path,
172 const bool allow_multifrag,
173 const bool jit_debug,
174 const bool intel_jit_profile,
175 const bool read_only,
176 const bool allow_loop_joins,
177 const bool enable_rendering,
178 const bool renderer_prefer_igpu,
179 const unsigned renderer_vulkan_timeout_ms,
180 const bool renderer_use_parallel_executors,
181 const bool enable_auto_clear_render_mem,
182 const int render_oom_retry_threshold,
183 const size_t render_mem_bytes,
184 const size_t max_concurrent_render_sessions,
185 const size_t reserved_gpu_mem,
186 const bool render_compositor_use_last_gpu,
187 const bool renderer_enable_slab_allocation,
188 const size_t num_reader_threads,
191 const bool legacy_syntax,
192 const int idle_session_duration,
193 const int max_session_duration,
194 const std::string& udf_filename,
195 const std::string& clang_path,
196 const std::vector<std::string>& clang_options,
198 const std::string& libgeos_so_filename,
200 #ifdef HAVE_TORCH_TFS
201 const std::string& torch_lib_path,
204 const bool is_new_db)
205 : leaf_aggregator_(db_leaves)
206 , db_leaves_(db_leaves)
207 , string_leaves_(string_leaves)
208 , base_data_path_(base_data_path)
209 , random_gen_(std::random_device{}())
210 , session_id_dist_(0, INT32_MAX)
211 , jit_debug_(jit_debug)
212 , intel_jit_profile_(intel_jit_profile)
213 , allow_multifrag_(allow_multifrag)
214 , read_only_(read_only)
215 , allow_loop_joins_(allow_loop_joins)
216 , authMetadata_(authMetadata)
217 , system_parameters_(system_parameters)
218 , legacy_syntax_(legacy_syntax)
220 std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
221 , super_user_rights_(
false)
222 , idle_session_duration_(idle_session_duration * 60)
223 , max_session_duration_(max_session_duration * 60)
224 , enable_rendering_(enable_rendering)
225 , renderer_prefer_igpu_(renderer_prefer_igpu)
226 , renderer_vulkan_timeout_(renderer_vulkan_timeout_ms)
227 , renderer_use_parallel_executors_(renderer_use_parallel_executors)
228 , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
229 , render_oom_retry_threshold_(render_oom_retry_threshold)
230 , render_mem_bytes_(render_mem_bytes)
231 , max_concurrent_render_sessions_(max_concurrent_render_sessions)
232 , reserved_gpu_mem_(reserved_gpu_mem)
233 , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
234 , renderer_enable_slab_allocation_{renderer_enable_slab_allocation}
235 , num_reader_threads_(num_reader_threads)
237 , libgeos_so_filename_(libgeos_so_filename)
239 #ifdef HAVE_TORCH_TFS
240 , torch_lib_path_(torch_lib_path)
242 , disk_cache_config_(disk_cache_config)
243 , udf_filename_(udf_filename)
244 , clang_path_(clang_path)
245 , clang_options_(clang_options)
246 , max_num_sessions_(-1) {
248 initialize(is_new_db);
249 resetSessionsStore();
253 size_t num_cpu_slots{0};
254 size_t num_gpu_slots{0};
255 size_t cpu_result_mem{0};
256 size_t cpu_buffer_pool_mem{0};
257 size_t gpu_buffer_pool_mem{0};
258 LOG(
INFO) <<
"Initializing Executor Resource Manager";
261 LOG(
INFO) <<
"\tSetting Executor resource pool avaiable CPU threads/slots to "
262 "user-specified value of "
266 LOG(
INFO) <<
"\tSetting Executor resource pool avaiable CPU threads/slots to default "
277 LOG(
INFO) <<
"\tSetting max per-query CPU threads to ratio of "
279 << num_cpu_slots <<
" available threads, or "
289 cpu_buffer_pool_mem =
data_mgr_->getCpuBufferPoolSize();
293 const size_t system_mem_bytes = DataMgr::getTotalSystemMemory();
294 CHECK_GT(system_mem_bytes,
size_t(0));
295 const size_t remaining_cpu_mem_bytes = system_mem_bytes >= cpu_buffer_pool_mem
296 ? system_mem_bytes - cpu_buffer_pool_mem
299 std::max(static_cast<size_t>(remaining_cpu_mem_bytes *
301 static_cast<size_t>(1UL << 32));
306 gpu_buffer_pool_mem =
data_mgr_->getGpuBufferPoolSize();
322 constexpr
double buffer_pool_max_occupancy{0.95};
323 const size_t conservative_cpu_buffer_pool_mem =
324 static_cast<size_t>(cpu_buffer_pool_mem * buffer_pool_max_occupancy);
325 const size_t conservative_gpu_buffer_pool_mem =
326 static_cast<size_t>(gpu_buffer_pool_mem * buffer_pool_max_occupancy);
329 <<
"\tSetting Executor resource pool reserved space for CPU buffer pool memory to "
331 if (gpu_buffer_pool_mem > 0UL) {
332 LOG(
INFO) <<
"\tSetting Executor resource pool reserved space for GPU buffer pool "
336 LOG(
INFO) <<
"\tSetting Executor resource pool reserved space for CPU result memory to "
343 conservative_cpu_buffer_pool_mem,
344 conservative_gpu_buffer_pool_mem,
360 <<
"The product of g_num_tuple_threshold_switch_to_baseline and "
361 "g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline exceeds 64 bits.";
369 for (
auto session : sessions) {
387 "Server already initialized; service restart required to activate any new "
401 LOG(
WARNING) <<
"This build isn't CUDA enabled, will run on CPU";
408 is_rendering_enabled =
false;
416 if (is_rendering_enabled) {
420 std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
424 cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
432 }
catch (
const std::exception& e) {
433 LOG(
ERROR) <<
"Unable to instantiate CudaMgr, falling back to CPU-only mode. "
437 is_rendering_enabled =
false;
452 }
catch (
const std::exception& e) {
453 LOG(
FATAL) <<
"Failed to initialize data manager: " << e.what();
459 std::string udf_ast_filename(
"");
463 const auto cuda_mgr =
data_mgr_->getCudaMgr();
465 cuda_mgr ? cuda_mgr->getDeviceArch()
471 if (!cuda_udf_ir_file.empty()) {
476 }
catch (
const std::exception& e) {
477 LOG(
FATAL) <<
"Failed to initialize UDF compiler: " << e.what();
483 }
catch (
const std::exception& e) {
484 LOG(
FATAL) <<
"Failed to initialize Calcite server: " << e.what();
492 }
catch (
const std::exception& e) {
493 LOG(
FATAL) <<
"Failed to initialize extension functions: " << e.what();
498 }
catch (
const std::exception& e) {
499 LOG(
FATAL) <<
"Failed to initialize table functions factory: " << e.what();
502 #ifdef HAVE_RUNTIME_LIBS
504 #ifdef HAVE_TORCH_TFS
509 }
catch (
const std::exception& e) {
510 LOG(
ERROR) <<
"Failed to load runtime libraries: " << e.what();
511 LOG(
ERROR) <<
"Support for runtime library table functions is disabled.";
518 std::vector<TUserDefinedFunction> udfs = {};
519 calcite_->setRuntimeExtensionFunctions(udfs, udtfs,
false);
520 }
catch (
const std::exception& e) {
521 LOG(
FATAL) <<
"Failed to register compile-time table functions: " << e.what();
526 LOG(
ERROR) <<
"No GPUs detected, falling back to CPU mode";
541 }
catch (
const std::exception& e) {
542 LOG(
FATAL) <<
"Failed to initialize system catalog: " << e.what();
548 if (is_rendering_enabled) {
561 }
catch (
const std::exception& e) {
562 LOG(
ERROR) <<
"Backend rendering disabled: " << e.what();
569 if (!libgeos_so_filename_.empty()) {
570 g_libgeos_so_filename.reset(
new std::string(libgeos_so_filename_));
571 LOG(
INFO) <<
"Overriding default geos library with '" + *g_libgeos_so_filename +
"'";
587 const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
594 std::string session_id;
599 calcite_->getInternalSessionProxyUserName(),
600 calcite_->getInternalSessionProxyPassword(),
607 std::make_shared<Catalog_Namespace::SessionInfo>(
609 CHECK(emplace_ret.second);
621 const std::string& username,
622 const std::string& dbname) {
625 std::string username2 = username;
626 std::string dbname2 = dbname;
628 std::shared_ptr<Catalog>
cat =
nullptr;
632 }
catch (std::exception& e) {
639 std::vector<DBObject> dbObjects;
640 dbObjects.push_back(dbObject);
643 " is not allowed to access database " + dbname2 +
".");
645 connect_impl(session_id, std::string(), dbname2, user_meta, cat, stdlog);
653 const std::string& inputToken,
654 const std::string& dbname) {
659 const std::string& username,
660 const std::string& passwd,
661 const std::string& dbname) {
665 std::string username2 = username;
666 std::string dbname2 = dbname;
668 std::shared_ptr<Catalog>
cat =
nullptr;
672 }
catch (std::exception& e) {
673 stdlog.appendNameValuePairs(
"user", username,
"db", dbname,
"exception", e.what());
680 std::vector<DBObject> dbObjects;
681 dbObjects.push_back(dbObject);
683 stdlog.appendNameValuePairs(
684 "user", username,
"db", dbname,
"exception",
"Missing Privileges");
686 " is not allowed to access database " + dbname2 +
".");
688 connect_impl(session_id, passwd, dbname2, user_meta, cat, stdlog);
695 const std::string& passwd,
696 const std::string& dbname,
698 std::shared_ptr<Catalog>
cat,
704 session_id = session_ptr->get_session_id();
713 ? std::vector<std::string>{{
"super"}}
728 const auto session_id = session_ptr->get_session_id();
729 std::exception_ptr leaf_exception =
nullptr;
735 leaf_exception = std::current_exception();
742 if (leaf_exception) {
743 std::rethrow_exception(leaf_exception);
748 const std::string& dbname) {
752 auto stdlog =
STDLOG(session_ptr);
754 std::string dbname2 = dbname;
757 dbname2, session_ptr->get_currentUser().userName);
758 session_ptr->set_catalog_ptr(cat);
763 }
catch (std::exception& e) {
769 const TSessionId& session1_id_or_json) {
773 auto stdlog =
STDLOG(session1_ptr);
778 std::shared_ptr<Catalog>
cat = session1_ptr->get_catalog_ptr();
780 session2_id = session2_ptr->get_session_id();
787 }
catch (std::exception& e) {
793 const TSessionId& interrupt_session_id_or_json) {
802 auto&
cat = session_ptr->getCatalog();
803 auto stdlog =
STDLOG(session_ptr);
805 const auto allow_query_interrupt =
808 const auto dbname = cat.getCurrentDB().dbName;
819 auto target_executor_ids =
820 executor->getExecutorIdsRunningQuery(query_request_info.
sessionId());
821 if (target_executor_ids.empty()) {
823 executor->getSessionLock());
824 if (executor->checkIsQuerySessionEnrolled(query_request_info.
sessionId(),
825 session_read_lock)) {
826 session_read_lock.unlock();
827 VLOG(1) <<
"Received interrupt: "
828 <<
"User " << session_ptr->get_currentUser().userLoggable()
829 <<
", Database " << dbname << std::endl;
830 executor->interrupt(query_request_info.
sessionId(),
834 for (
auto& executor_id : target_executor_ids) {
835 VLOG(1) <<
"Received interrupt: "
836 <<
"Executor " << executor_id <<
", User "
837 << session_ptr->get_currentUser().userLoggable() <<
", Database "
838 << dbname << std::endl;
840 target_executor->interrupt(query_request_info.
sessionId(),
845 LOG(
INFO) <<
"User " << session_ptr->get_currentUser().userName
846 <<
" interrupted session with database " << dbname << std::endl;
853 return TRole::type::AGGREGATOR;
855 return TRole::type::LEAF;
857 return TRole::type::SERVER;
860 const TSessionId& session_id_or_json) {
868 _return.rendering_enabled = rendering_enabled;
872 _return.poly_rendering_enabled = rendering_enabled;
874 _return.renderer_status_json =
879 const TSessionId& session_id_or_json) {
899 LOG(
INFO) <<
"get_status() called in session-less mode";
905 ret.rendering_enabled = rendering_enabled;
909 ret.poly_rendering_enabled = rendering_enabled;
911 ret.renderer_status_json =
915 _return.push_back(ret);
917 std::vector<TServerStatus> leaf_status =
919 _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
924 const TSessionId& session_id_or_json) {
930 const auto cuda_mgr =
data_mgr_->getCudaMgr();
932 ret.num_gpu_hw = cuda_mgr->getDeviceCount();
933 ret.start_gpu = cuda_mgr->getStartGpu();
934 if (ret.start_gpu >= 0) {
935 ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
938 for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
939 TGpuSpecification gpu_spec;
940 auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
941 gpu_spec.num_sm = deviceProperties->numMPs;
942 gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
943 gpu_spec.memory = deviceProperties->globalMem;
944 gpu_spec.compute_capability_major = deviceProperties->computeMajor;
945 gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
946 ret.gpu_info.push_back(gpu_spec);
951 ret.num_cpu_hw = std::thread::hardware_concurrency();
955 _return.hardware_info.push_back(ret);
959 const TSessionId& session_id_or_json) {
964 auto stdlog =
STDLOG(session_ptr);
966 auto user_metadata = session_ptr->get_currentUser();
967 _return.user = user_metadata.userName;
968 _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
969 _return.start_time = session_ptr->get_start_time();
970 _return.is_super = user_metadata.isSuper;
983 <<
"element types of arrays should always be nullable";
985 const auto array_tv = boost::get<ArrayTargetValue>(&tv);
987 bool is_null = !array_tv->is_initialized();
989 const auto& vec = array_tv->get();
990 for (
const auto& elem_tv : vec) {
994 column.data.arr_col.push_back(tColumn);
995 column.nulls.push_back(is_null && !ti.
get_notnull());
997 const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
999 auto s_n = boost::get<NullableString>(scalar_tv);
1000 auto s = boost::get<std::string>(s_n);
1002 column.data.str_col.push_back(*s);
1004 column.data.str_col.emplace_back(
"");
1005 auto null_p = boost::get<void*>(s_n);
1006 CHECK(null_p && !*null_p);
1010 const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1012 bool is_null = !array_tv->is_initialized();
1016 const auto& vec = array_tv->get();
1017 for (
const auto& elem_tv : vec) {
1020 column.data.arr_col.push_back(tColumn);
1021 column.nulls.push_back(
false);
1024 column.data.arr_col.push_back(tColumn);
1025 column.nulls.push_back(is_null && !ti.
get_notnull());
1030 const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1032 if (boost::get<int64_t>(scalar_tv)) {
1033 int64_t data = *(boost::get<int64_t>(scalar_tv));
1036 double val =
static_cast<double>(data);
1038 val /= pow(10.0, std::abs(ti.
get_scale()));
1040 column.data.real_col.push_back(val);
1042 column.data.int_col.push_back(data);
1071 column.nulls.push_back(
false);
1073 }
else if (boost::get<double>(scalar_tv)) {
1074 double data = *(boost::get<double>(scalar_tv));
1075 column.data.real_col.push_back(data);
1081 }
else if (boost::get<float>(scalar_tv)) {
1083 float data = *(boost::get<float>(scalar_tv));
1084 column.data.real_col.push_back(data);
1086 }
else if (boost::get<NullableString>(scalar_tv)) {
1087 auto s_n = boost::get<NullableString>(scalar_tv);
1088 auto s = boost::get<std::string>(s_n);
1090 column.data.str_col.push_back(*s);
1092 column.data.str_col.emplace_back(
"");
1093 auto null_p = boost::get<void*>(s_n);
1094 CHECK(null_p && !*null_p);
1105 const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1109 <<
"element types of arrays should always be nullable";
1110 const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1112 if (array_tv->is_initialized()) {
1113 const auto& vec = array_tv->get();
1114 for (
const auto& elem_tv : vec) {
1116 datum.val.arr_val.push_back(scalar_col_val);
1119 datum.is_null =
false;
1121 datum.is_null =
true;
1125 if (boost::get<int64_t>(scalar_tv)) {
1126 int64_t data = *(boost::get<int64_t>(scalar_tv));
1129 double val =
static_cast<double>(data);
1131 val /= pow(10.0, std::abs(ti.
get_scale()));
1133 datum.val.real_val = val;
1135 datum.val.int_val = data;
1149 datum.is_null = (datum.val.int_val ==
NULL_INT);
1154 datum.is_null = (datum.val.int_val ==
NULL_BIGINT);
1161 datum.is_null = (datum.val.int_val ==
NULL_BIGINT);
1164 datum.is_null =
false;
1166 }
else if (boost::get<double>(scalar_tv)) {
1167 datum.val.real_val = *(boost::get<double>(scalar_tv));
1169 datum.is_null = (datum.val.real_val ==
NULL_FLOAT);
1171 datum.is_null = (datum.val.real_val ==
NULL_DOUBLE);
1173 }
else if (boost::get<float>(scalar_tv)) {
1175 datum.val.real_val = *(boost::get<float>(scalar_tv));
1176 datum.is_null = (datum.val.real_val ==
NULL_FLOAT);
1177 }
else if (boost::get<NullableString>(scalar_tv)) {
1178 auto s_n = boost::get<NullableString>(scalar_tv);
1179 auto s = boost::get<std::string>(s_n);
1181 datum.val.str_val = *s;
1183 auto null_p = boost::get<void*>(s_n);
1184 CHECK(null_p && !*null_p);
1194 TQueryResult& _return,
1196 const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1197 const std::string& query_str,
1198 const bool column_format,
1199 const std::string& nonce,
1200 const int32_t first_n,
1201 const int32_t at_most_n,
1202 const bool use_calcite) {
1203 _return.total_time_ms = 0;
1204 _return.nonce = nonce;
1206 switch (pw.getQueryType()) {
1208 _return.query_type = TQueryType::READ;
1209 VLOG(1) <<
"query type: READ";
1213 _return.query_type = TQueryType::WRITE;
1214 VLOG(1) <<
"query type: WRITE";
1218 _return.query_type = TQueryType::SCHEMA_READ;
1219 VLOG(1) <<
"query type: SCHEMA READ";
1223 _return.query_type = TQueryType::SCHEMA_WRITE;
1224 VLOG(1) <<
"query type: SCHEMA WRITE";
1228 _return.query_type = TQueryType::UNKNOWN;
1240 session_ptr->get_executor_device_type(),
1246 _return, result, query_state_proxy, column_format, first_n, at_most_n);
1253 const bool column_format,
1254 const int32_t first_n,
1255 const int32_t at_most_n) {
1257 if (result.
empty()) {
1290 const TSessionId& session_id_or_json,
1291 const std::string& query_str,
1292 const bool column_format,
1293 const std::string& nonce,
1294 const int32_t first_n,
1295 const int32_t at_most_n) {
1298 const std::string exec_ra_prefix =
"execute relalg";
1299 const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1301 use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1304 auto stdlog =
STDLOG(session_ptr, query_state);
1306 stdlog.appendNameValuePairs(
"nonce", nonce);
1309 ScopeGuard reset_was_deferred_copy_from = [
this, &session_ptr] {
1313 if (first_n >= 0 && at_most_n >= 0) {
1323 query_state->createQueryStateProxy(),
1324 query_state->getQueryStr(),
1331 _return.nonce = nonce;
1334 query_state->createQueryStateProxy(),
1344 std::string debug_json = timer.stopAndGetJson();
1345 if (!debug_json.empty()) {
1346 _return.__set_debug(std::move(debug_json));
1348 stdlog.appendNameValuePairs(
1349 "execution_time_ms",
1350 _return.execution_time_ms,
1352 stdlog.duration<std::chrono::milliseconds>());
1355 }
catch (
const std::exception& e) {
1356 if (strstr(e.what(),
"java.lang.NullPointerException")) {
1358 }
else if (strstr(e.what(),
"SQL Error: Encountered \";\"")) {
1360 }
else if (strstr(e.what(),
"SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1369 const TSessionId& session_id_or_json,
1370 const std::string& query_str,
1371 const bool column_format,
1372 const int32_t first_n,
1373 const int32_t at_most_n,
1377 const std::string exec_ra_prefix =
"execute relalg";
1378 const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1380 use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1385 auto stdlog =
STDLOG(session_ptr, query_state);
1389 ScopeGuard reset_was_deferred_copy_from = [
this, &session_ptr] {
1393 if (first_n >= 0 && at_most_n >= 0) {
1398 query_state->createQueryStateProxy(),
1400 session_ptr->get_executor_device_type(),
1410 stdlog.appendNameValuePairs(
1411 "execution_time_ms",
1414 stdlog.duration<std::chrono::milliseconds>());
1417 }
catch (
const std::exception& e) {
1418 if (strstr(e.what(),
"java.lang.NullPointerException")) {
1420 }
else if (strstr(e.what(),
"SQL Error: Encountered \";\"")) {
1422 }
else if (strstr(e.what(),
"SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1431 int64_t total_time_ms(0);
1443 TCreateParams create_params;
1444 if (deferred_copy_from_state->partitions ==
"REPLICATED") {
1445 create_params.is_replicated =
true;
1451 deferred_copy_from_state->table,
1452 deferred_copy_from_state->file_name,
1453 deferred_copy_from_state->copy_params,
1458 return total_time_ms;
1462 const TSessionId& session_id_or_json,
1463 const std::string& query_str,
1465 const int32_t device_id,
1466 const int32_t first_n,
1473 auto stdlog =
STDLOG(session_ptr, query_state);
1475 const auto executor_device_type = session_ptr->get_executor_device_type();
1477 if (results_device_type == TDeviceType::GPU) {
1484 if (device_id < 0 || device_id >=
data_mgr_->getCudaMgr()->getDeviceCount()) {
1486 std::string(
"Invalid device_id or unavailable GPU with this ID"));
1492 "Only read queries supported for the Arrow sql_execute_df endpoint."));
1496 "Explain is currently unsupported by the Arrow sql_execute_df endpoint."));
1502 query_state->createQueryStateProxy(),
1504 executor_device_type,
1510 const auto result_set = execution_result.
getRows();
1511 const auto executor_results_device_type = results_device_type == TDeviceType::CPU
1514 _return.execution_time_ms =
1516 const auto converter = std::make_unique<ArrowResultSetConverter>(
1519 executor_results_device_type,
1525 _return.arrow_conversion_time_ms +=
1528 std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
1529 _return.sm_size = arrow_result.sm_size;
1531 std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
1533 std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
1538 std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
1540 _return.df_size = arrow_result.df_size;
1544 const TSessionId& session_id_or_json,
1545 const std::string& query_str,
1546 const int32_t device_id,
1547 const int32_t first_n) {
1553 request_info.
json(),
1558 TArrowTransport::SHARED_MEMORY);
1563 const TDataFrame& df,
1565 const int32_t device_id) {
1569 std::string serialized_cuda_handle =
"";
1570 if (device_type == TDeviceType::GPU) {
1574 ex.error_msg = std::string(
1575 "Current data frame handle is not bookkept or been inserted "
1583 std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1584 std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1586 sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1595 const TSessionId& session_id_or_json,
1596 const std::string& query_str) {
1603 stdlog.setQueryState(query_state);
1606 if (
ExplainInfo(query_str).isExplain() || pw.is_ddl || pw.is_update_dml) {
1607 throw std::runtime_error(
"Can only validate SELECT statements.");
1612 TPlanResult parse_result;
1614 std::tie(parse_result, locks) =
parse_to_ra(query_state->createQueryStateProxy(),
1615 query_state->getQueryStr(),
1620 const auto query_ra = parse_result.plan_result;
1621 _return =
validateRelAlg(query_ra, query_state->createQueryStateProxy());
1622 }
catch (
const std::exception& e) {
1638 const std::string& sql) {
1639 boost::regex id_regex{R
"(([[:alnum:]]|_|\.)+)",
1640 boost::regex::extended | boost::regex::icase};
1641 boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1642 boost::sregex_token_iterator end;
1643 std::unordered_set<std::string> uc_column_names;
1644 std::unordered_set<std::string> uc_column_table_qualifiers;
1645 for (; tok_it != end; ++tok_it) {
1646 std::string column_name = *tok_it;
1647 std::vector<std::string> column_tokens;
1648 boost::split(column_tokens, column_name, boost::is_any_of(
"."));
1649 if (column_tokens.size() == 2) {
1651 uc_column_table_qualifiers.insert(
to_upper(column_tokens.front()));
1653 uc_column_names.insert(
to_upper(column_name));
1656 return {uc_column_names, uc_column_table_qualifiers};
1662 const TSessionId& session_id_or_json,
1663 const std::string& sql,
1668 std::vector<std::string> visible_tables;
1672 proj_tokens.uc_column_names, visible_tables, stdlog);
1674 compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1675 proj_tokens.uc_column_table_qualifiers.end());
1680 [&compatible_table_names](
const TCompletionHint& lhs,
const TCompletionHint& rhs) {
1681 if (lhs.type == TCompletionHintType::TABLE &&
1682 rhs.type == TCompletionHintType::TABLE) {
1685 if (compatible_table_names.find(
to_upper(lhs.hints.back())) !=
1686 compatible_table_names.end() &&
1687 compatible_table_names.find(
to_upper(rhs.hints.back())) ==
1688 compatible_table_names.end()) {
1692 return lhs.type < rhs.type;
1697 std::vector<std::string>& visible_tables,
1699 const std::string& sql,
1707 calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1708 }
catch (
const std::exception& e) {
1710 ex.error_msg = std::string(e.what());
1714 boost::regex from_expr{R
"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1715 const size_t length_to_cursor =
1716 cursor < 0 ? sql.size() : std::min(sql.size(),
static_cast<size_t>(cursor));
1718 if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1727 std::vector<std::string>& visible_tables,
1728 const std::string& sql,
1730 const auto last_word =
1732 boost::regex select_expr{R
"(\s*select\s+)",
1733 boost::regex::extended | boost::regex::icase};
1734 const size_t length_to_cursor =
1735 cursor < 0 ? sql.size() : std::min(sql.size(),
static_cast<size_t>(cursor));
1738 if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1749 const std::string kFromKeyword{
"FROM"};
1750 if (boost::istarts_with(kFromKeyword, last_word)) {
1751 TCompletionHint keyword_hint;
1752 keyword_hint.type = TCompletionHintType::KEYWORD;
1753 keyword_hint.replaced = last_word;
1754 keyword_hint.hints.emplace_back(kFromKeyword);
1755 hints.push_back(keyword_hint);
1758 const std::string kSelectKeyword{
"SELECT"};
1759 if (boost::istarts_with(kSelectKeyword, last_word)) {
1760 TCompletionHint keyword_hint;
1761 keyword_hint.type = TCompletionHintType::KEYWORD;
1762 keyword_hint.replaced = last_word;
1763 keyword_hint.hints.emplace_back(kSelectKeyword);
1764 hints.push_back(keyword_hint);
1769 std::unordered_map<std::string, std::unordered_set<std::string>>
1772 std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1773 for (
auto it = table_names.begin(); it != table_names.end();) {
1774 TTableDetails table_details;
1777 }
catch (
const TDBException& e) {
1779 it = table_names.erase(it);
1782 for (
const auto& column_type : table_details.row_desc) {
1783 column_names_by_table[*it].emplace(column_type.col_name);
1787 return column_names_by_table;
1796 const std::unordered_set<std::string>& uc_column_names,
1797 std::vector<std::string>& table_names,
1799 std::unordered_set<std::string> compatible_table_names_by_column;
1800 for (
auto it = table_names.begin(); it != table_names.end();) {
1801 TTableDetails table_details;
1804 }
catch (
const TDBException& e) {
1806 it = table_names.erase(it);
1809 for (
const auto& column_type : table_details.row_desc) {
1810 if (uc_column_names.find(
to_upper(column_type.col_name)) != uc_column_names.end()) {
1811 compatible_table_names_by_column.emplace(
to_upper(*it));
1817 return compatible_table_names_by_column;
1821 const bool is_update_delete) {
1828 TQueryResult query_result;
1830 auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1835 parent_thread_local_ids =
1851 auto result_future = execute_rel_alg_task->get_future();
1852 result_future.get();
1855 const auto& row_desc = query_result.row_set.row_desc;
1856 const auto& targets_meta = execution_result.getTargetsMeta();
1857 CHECK_EQ(row_desc.size(), targets_meta.size());
1860 TRowDescriptor fixedup_row_desc;
1861 for (
size_t i = 0; i < row_desc.size(); i++) {
1862 const auto& col_desc = row_desc[i];
1863 auto fixedup_col_desc = col_desc;
1864 if (col_desc.col_type.encoding == TEncodingType::DICT &&
1865 col_desc.col_type.comp_param > 0) {
1866 const auto& type_info = targets_meta[i].get_type_info();
1869 type_info.getStringDictKey().db_id);
1870 const auto dd = cat->getMetadataForDict(col_desc.col_type.comp_param,
false);
1872 fixedup_col_desc.col_type.comp_param = dd->dictNBits;
1874 fixedup_row_desc.push_back(fixedup_col_desc);
1876 return fixedup_row_desc;
1880 const TSessionId& session_id_or_json) {
1884 auto session_ptr = stdlog.getConstSessionInfo();
1885 if (!session_ptr->get_currentUser().isSuper) {
1890 session_ptr->getCatalog().getCurrentDB().dbId);
1893 false,
true, session_ptr->get_currentUser().userName);
1898 const std::string& granteeName,
1899 const std::string& roleName) {
1903 const auto stdlog =
STDLOG(session_ptr);
1904 const auto current_user = session_ptr->get_currentUser();
1905 if (!current_user.isSuper) {
1907 user && current_user.userName != granteeName) {
1910 current_user.userName, granteeName,
true)) {
1912 "Only super users can check roles assignment that have not been directly "
1913 "granted to a user.");
1921 TDBObject outObject;
1922 outObject.objectName = inObject.
getName();
1923 outObject.grantee = roleName;
1976 const int type_val =
static_cast<int>(inObject.
getType());
1977 CHECK(type_val >= 0 && type_val < 6);
1983 const TDBObjectPermissions& permissions) {
1984 if (!permissions.__isset.database_permissions_) {
1987 auto perms = permissions.database_permissions_;
1990 (perms.view_sql_editor_ &&
2000 const TDBObjectPermissions& permissions) {
2001 if (!permissions.__isset.table_permissions_) {
2004 auto perms = permissions.table_permissions_;
2020 const TDBObjectPermissions& permissions) {
2021 if (!permissions.__isset.dashboard_permissions_) {
2024 auto perms = permissions.dashboard_permissions_;
2036 const TDBObjectPermissions& permissions) {
2037 if (!permissions.__isset.view_permissions_) {
2040 auto perms = permissions.view_permissions_;
2054 const TDBObjectPermissions& permissions) {
2055 CHECK(permissions.__isset.server_permissions_);
2056 auto perms = permissions.server_permissions_;
2068 const std::string& granteeName,
2069 const std::string& objectName,
2071 const TDBObjectPermissions& permissions) {
2075 auto stdlog =
STDLOG(session_ptr);
2076 auto const&
cat = session_ptr->getCatalog();
2077 auto const& current_user = session_ptr->get_currentUser();
2079 current_user.userName, granteeName,
false)) {
2081 "Users except superusers can only check privileges for self or roles granted "
2095 std::string func_name;
2096 switch (objectType) {
2099 func_name =
"database";
2103 func_name =
"table";
2107 func_name =
"dashboard";
2115 func_name =
"server";
2120 DBObject req_object(objectName, type);
2124 if (grantee_object) {
2134 const TSessionId& session_id_or_json,
2135 const std::string& roleName) {
2139 auto stdlog =
STDLOG(session_ptr);
2140 auto const& user = session_ptr->get_currentUser();
2141 if (!user.isSuper &&
2147 auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
2148 for (
auto& dbObject : *rl->getDbObjects(
true)) {
2149 if (dbObject.first.dbId != dbId) {
2155 TDBObjectsForRole.push_back(tdbObject);
2163 const TSessionId& session_id_or_json,
2164 const std::string& objectName,
2169 auto stdlog =
STDLOG(session_ptr);
2170 const auto&
cat = session_ptr->getCatalog();
2192 DBObject object_to_find(objectName, object_type);
2197 if (objectName ==
"") {
2198 object_to_find =
DBObject(-1, object_type);
2200 object_to_find =
DBObject(std::stoi(objectName), object_type);
2203 !objectName.empty()) {
2205 auto td =
cat.getMetadataForTable(objectName,
false);
2208 object_to_find =
DBObject(objectName, object_type);
2212 }
catch (
const std::exception&) {
2217 DBObject object_to_find_dblevel(
"", object_type);
2220 if (session_ptr->get_currentUser().isSuper) {
2224 session_ptr->get_currentUser().userId};
2225 dbObj.setName(
"super");
2226 TDBObjects.push_back(
2230 std::vector<std::string> grantees =
2232 session_ptr->get_currentUser().isSuper,
2233 session_ptr->get_currentUser().userName);
2234 for (
const auto& grantee : grantees) {
2237 if (gr && (object_found = gr->findDbObject(object_to_find.
getObjectKey(),
true))) {
2242 (object_found = gr->findDbObject(object_to_find_dblevel.
getObjectKey(),
true))) {
2249 std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr,
2250 std::vector<std::string>& roles,
2251 const std::string& granteeName,
2255 if (session_ptr->get_currentUser().isSuper) {
2256 roles = grantee->getRoles(!effective);
2257 }
else if (grantee->isUser()) {
2258 if (session_ptr->get_currentUser().userName == granteeName) {
2259 roles = grantee->getRoles(!effective);
2262 "Only a superuser is authorized to request list of roles granted to another "
2266 CHECK(!grantee->isUser());
2270 session_ptr->get_currentUser().userName, granteeName,
false)) {
2271 roles = grantee->getRoles(!effective);
2282 const TSessionId& session_id_or_json,
2283 const std::string& granteeName) {
2289 auto session_ptr = stdlog.getConstSessionInfo();
2294 const TSessionId& session_id_or_json,
2295 const std::string& granteeName) {
2299 auto session_ptr = stdlog.getConstSessionInfo();
2305 const std::map<std::string, std::vector<std::string>>& table_col_names) {
2306 std::ostringstream oss;
2307 for (
const auto& [table_name, col_names] : table_col_names) {
2308 oss <<
":" << table_name;
2309 for (
const auto& col_name : col_names) {
2310 oss <<
"," << col_name;
2318 TPixelTableRowResult& _return,
2319 const TSessionId& session_id_or_json,
2320 const int64_t widget_id,
2321 const TPixel& pixel,
2322 const std::map<std::string, std::vector<std::string>>& table_col_names,
2323 const bool column_format,
2324 const int32_t pixel_radius,
2325 const std::string& nonce) {
2329 auto stdlog =
STDLOG(session_ptr,
2358 }
catch (std::exception& e) {
2365 TColumnType col_type;
2389 col_type.col_type.comp_param = 0;
2396 col_type.col_type.comp_param = dd->dictNBits;
2398 col_type.col_type.comp_param =
2411 const TSessionId& session_id_or_json,
2412 const std::string& table_name,
2413 const bool include_system_columns) {
2423 TTableDetails& _return,
2424 const TSessionId& session_id_or_json,
2425 const std::string& table_name,
2426 const std::string& database_name) {
2436 const TSessionId& session_id_or_json,
2437 const std::string& table_name) {
2449 const TSessionId& session_id_or_json,
2450 const std::string& table_name,
2451 const std::string& database_name) {
2466 CHECK(foreign_table);
2467 TTableRefreshInfo refresh_info;
2468 const auto& update_type =
2470 CHECK(update_type.has_value());
2473 }
else if (update_type.value() ==
2475 refresh_info.update_type = TTableRefreshUpdateType::APPEND;
2477 UNREACHABLE() <<
"Unexpected refresh update type: " << update_type.value();
2480 const auto& timing_type =
2482 CHECK(timing_type.has_value());
2484 refresh_info.timing_type = TTableRefreshTimingType::MANUAL;
2485 refresh_info.interval_count = -1;
2486 }
else if (timing_type.value() ==
2488 refresh_info.timing_type = TTableRefreshTimingType::SCHEDULED;
2489 const auto& start_date_time = foreign_table->getOption(
2491 CHECK(start_date_time.has_value());
2492 auto start_date_time_epoch = dateTimeParse<kTIMESTAMP>(start_date_time.value(), 0);
2493 refresh_info.start_date_time =
2495 const auto& interval =
2497 CHECK(interval.has_value());
2498 const auto& interval_str = interval.value();
2499 refresh_info.interval_count =
2500 std::stoi(interval_str.substr(0, interval_str.length() - 1));
2501 auto interval_type = std::toupper(interval_str[interval_str.length() - 1]);
2502 if (interval_type ==
'H') {
2503 refresh_info.interval_type = TTableRefreshIntervalType::HOUR;
2504 }
else if (interval_type ==
'D') {
2505 refresh_info.interval_type = TTableRefreshIntervalType::DAY;
2506 }
else if (interval_type ==
'S') {
2508 refresh_info.interval_type = TTableRefreshIntervalType::NONE;
2510 UNREACHABLE() <<
"Unexpected interval type: " << interval_str;
2513 UNREACHABLE() <<
"Unexpected refresh timing type: " << timing_type.value();
2515 if (foreign_table->last_refresh_time !=
2518 {
kTIMESTAMP}, foreign_table->last_refresh_time);
2520 if (foreign_table->next_refresh_time !=
2523 {
kTIMESTAMP}, foreign_table->next_refresh_time);
2525 return refresh_info;
2531 const std::string& table_name,
2532 const bool get_system,
2533 const bool get_physical,
2534 const std::string& database_name) {
2537 auto cat = (database_name.empty())
2538 ? &session_info->getCatalog()
2543 const auto td_with_lock =
2545 *
cat, table_name,
false);
2546 const auto td = td_with_lock();
2549 bool have_privileges_on_view_sources =
true;
2555 const auto [query_ra, locks] =
parse_to_ra(query_state->createQueryStateProxy(),
2556 query_state->getQueryStr(),
2562 calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2564 }
catch (
const std::runtime_error&) {
2565 have_privileges_on_view_sources =
false;
2569 validateRelAlg(query_ra.plan_result, query_state->createQueryStateProxy());
2571 throw std::runtime_error(
2572 "Unable to access view " + table_name +
2573 ". The view may not exist, or the logged in user may not "
2574 "have permission to access the view.");
2576 }
catch (
const std::exception& e) {
2577 throw std::runtime_error(
"View '" + table_name +
2578 "' query has failed with an error: '" +
2579 std::string(e.what()) +
2580 "'.\nThe view must be dropped and re-created to "
2581 "resolve the error. \nQuery:\n" +
2582 query_state->getQueryStr());
2586 const auto col_descriptors =
cat->getAllColumnMetadataForTable(
2587 td->tableId, get_system,
true, get_physical);
2588 const auto deleted_cd =
cat->getDeletedColumn(td);
2589 for (
const auto cd : col_descriptors) {
2590 if (cd == deleted_cd) {
2596 throw std::runtime_error(
2597 "Unable to access table " + table_name +
2598 ". The table may not exist, or the logged in user may not "
2599 "have permission to access the table.");
2602 _return.fragment_size = td->maxFragRows;
2603 _return.page_size = td->fragPageSize;
2604 _return.max_rows = td->maxRows;
2606 (have_privileges_on_view_sources ? td->viewSQL
2607 :
"[Not enough privileges to see the view SQL]");
2608 _return.shard_count = td->nShards * std::max(
g_leaf_count,
size_t(1));
2609 if (td->nShards > 0) {
2610 auto cd =
cat->getMetadataForColumn(td->tableId, td->shardedColumnId);
2612 _return.sharded_column_name = cd->columnName;
2614 _return.key_metainfo = td->keyMetainfo;
2616 _return.partition_detail =
2617 td->partitions.empty()
2618 ? TPartitionDetail::DEFAULT
2620 ? TPartitionDetail::REPLICATED
2621 : (td->partitions ==
"SHARDED" ? TPartitionDetail::SHARDED
2622 : TPartitionDetail::OTHER));
2624 _return.table_type = TTableType::VIEW;
2625 }
else if (td->isTemporaryTable()) {
2626 _return.table_type = TTableType::TEMPORARY;
2627 }
else if (td->isForeignTable()) {
2628 _return.table_type = TTableType::FOREIGN;
2631 _return.table_type = TTableType::DEFAULT;
2634 }
catch (
const std::runtime_error& e) {
2640 const TSessionId& session_id_or_json,
2641 const std::string& link) {
2645 auto stdlog =
STDLOG(session_ptr);
2647 auto const&
cat = session_ptr->getCatalog();
2652 _return.view_state = ld->viewState;
2653 _return.view_name = ld->link;
2654 _return.update_time = ld->updateTime;
2655 _return.view_metadata = ld->viewMetadata;
2664 if (user_metadata.isSuper) {
2670 std::vector<DBObject> privObjects = {dbObject};
2678 const std::string& database_name) {
2679 if (database_name.empty()) {
2687 table_names = request_cat->getTableNamesForUser(session_info.
get_currentUser(),
2693 const TSessionId& session_id_or_json) {
2703 const TSessionId& session_id_or_json,
2704 const std::string& database_name) {
2711 *stdlog.getConstSessionInfo(),
2717 const TSessionId& session_id_or_json) {
2726 const TSessionId& session_id_or_json) {
2737 const bool with_table_locks) {
2741 const auto tables =
cat.getAllTableMetadataCopy();
2742 _return.reserve(
tables.size());
2744 for (
const auto& td :
tables) {
2745 if (td.shard >= 0) {
2755 ret.table_name = td.tableName;
2756 ret.is_view = td.isView;
2758 ret.shard_count = td.nShards;
2759 ret.max_rows = td.maxRows;
2760 ret.table_id = td.tableId;
2762 std::vector<TTypeInfo> col_types;
2763 std::vector<std::string> col_names;
2764 size_t num_cols = 0;
2767 TPlanResult parse_result;
2771 const auto query_ra = parse_result.plan_result;
2786 num_cols = result.row_set.row_desc.size();
2787 for (
const auto& col : result.row_set.row_desc) {
2788 if (col.is_physical) {
2792 col_types.push_back(col.col_type);
2793 col_names.push_back(col.col_name);
2795 }
catch (std::exception& e) {
2796 LOG(
WARNING) <<
"get_tables_meta: Ignoring broken view: " << td.tableName;
2801 const auto col_descriptors =
2802 cat.getAllColumnMetadataForTable(td.tableId,
false,
true,
false);
2803 const auto deleted_cd =
cat.getDeletedColumn(&td);
2804 for (
const auto cd : col_descriptors) {
2805 if (cd == deleted_cd) {
2809 col_names.push_back(cd->columnName);
2811 num_cols = col_descriptors.size();
2815 }
catch (
const std::runtime_error& e) {
2820 ret.num_cols = num_cols;
2821 std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2822 std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2824 _return.push_back(ret);
2829 const TSessionId& session_id_or_json) {
2834 auto session_ptr = stdlog.getConstSessionInfo();
2836 stdlog.setQueryState(query_state);
2842 }
catch (
const std::exception& e) {
2848 const TSessionId& session_id_or_json) {
2853 auto session_ptr = stdlog.getConstSessionInfo();
2854 std::list<Catalog_Namespace::UserMetadata> user_list;
2856 if (!session_ptr->get_currentUser().isSuper) {
2858 session_ptr->getCatalog().getCurrentDB().dbId);
2862 for (
auto u : user_list) {
2863 user_names.push_back(u.userName);
2892 auto session_ptr = stdlog.getConstSessionInfo();
2893 if (!session_ptr->get_currentUser().isSuper) {
2906 }
catch (
const std::exception& e) {
2915 auto session_ptr = stdlog.getConstSessionInfo();
2916 if (!session_ptr->get_currentUser().isSuper) {
2929 }
catch (
const std::exception& e) {
2939 auto session_ptr = stdlog.getConstSessionInfo();
2940 if (!session_ptr->get_currentUser().isSuper) {
2953 auto session_ptr = stdlog.getConstSessionInfo();
2954 if (!session_ptr->get_currentUser().isSuper) {
2959 }
catch (
const std::exception& e) {
2967 auto session_ptr = stdlog.getConstSessionInfo();
2968 if (!session_ptr->get_currentUser().isSuper) {
2973 }
catch (
const std::exception& e) {
2979 const TSessionId& leaf_session_id_or_json,
2980 const std::string& start_time_str,
2981 const std::string&
label,
2982 bool for_running_query_kernel) {
2989 auto session_ptr = stdlog.getConstSessionInfo();
2992 executor->enrollQuerySession(parent_request_info.
sessionId(),
2996 for_running_query_kernel
2997 ? QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL
2998 : QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
3002 const TSessionId& leaf_session_id_or_json,
3003 const std::string& start_time_str,
3004 const std::string&
label,
3005 bool for_running_query_kernel) {
3013 executor->clearQuerySessionStatus(parent_request_info.
sessionId(), start_time_str);
3021 const TSessionId& session_id_or_json,
3022 const std::string& memory_level) {
3027 std::vector<Data_Namespace::MemoryInfo> internal_memory;
3028 if (!memory_level.compare(
"gpu")) {
3036 for (
auto memInfo : internal_memory) {
3037 TNodeMemoryInfo nodeInfo;
3038 nodeInfo.page_size = memInfo.pageSize;
3039 nodeInfo.max_num_pages = memInfo.maxNumPages;
3040 nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
3041 nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
3042 for (
auto gpu : memInfo.nodeMemoryData) {
3044 md.slab = gpu.slabNum;
3045 md.start_page = gpu.startPage;
3046 md.num_pages = gpu.numPages;
3047 md.touch = gpu.touch;
3048 md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
3050 nodeInfo.node_memory_data.push_back(md);
3052 _return.push_back(nodeInfo);
3057 const TSessionId& session_id_or_json) {
3062 auto session_ptr = stdlog.getConstSessionInfo();
3063 const auto& user = session_ptr->get_currentUser();
3066 for (
auto& db : dbs) {
3068 dbinfo.db_name = std::move(db.dbName);
3069 dbinfo.db_owner = std::move(db.dbOwnerName);
3070 dbinfos.push_back(std::move(dbinfo));
3075 auto executor =
get_session_ptr(session_id)->get_executor_device_type();
3078 return TExecuteMode::CPU;
3080 return TExecuteMode::GPU;
3085 return TExecuteMode::CPU;
3092 auto stdlog =
STDLOG(session_ptr);
3101 throw std::runtime_error(
"Cannot import a sharded table directly to a leaf");
3106 const std::vector<std::string>& column_names) {
3107 std::unordered_set<std::string> unique_names;
3108 for (
const auto&
name : column_names) {
3110 if (unique_names.find(lower_name) != unique_names.end()) {
3113 unique_names.insert(lower_name);
3116 for (
const auto& cd : descs) {
3117 auto iter = unique_names.find(
to_lower(cd->columnName));
3118 if (iter != unique_names.end()) {
3119 unique_names.erase(iter);
3122 if (!unique_names.empty()) {
3132 const std::vector<std::string>& column_names) {
3133 std::vector<int> desc_to_column_ids;
3134 if (column_names.empty()) {
3136 for (
const auto& cd : descs) {
3137 if (!cd->isGeoPhyCol) {
3138 desc_to_column_ids.push_back(col_idx);
3143 for (
const auto& cd : descs) {
3144 if (!cd->isGeoPhyCol) {
3146 for (
size_t j = 0; j < column_names.size(); ++j) {
3149 desc_to_column_ids.push_back(j);
3154 if (!cd->columnType.get_notnull()) {
3155 desc_to_column_ids.push_back(-1);
3158 "' cannot be omitted due to NOT NULL constraint");
3164 return desc_to_column_ids;
3168 std::ostringstream oss;
3169 oss <<
"Cache size information {";
3173 auto resultset_cache_size =
3174 executor->getResultSetRecyclerHolder()
3175 .getResultSetRecycler()
3176 ->getResultSetRecyclerMetricTracker()
3178 if (resultset_cache_size) {
3179 oss <<
"\"query_resultset\": " << *resultset_cache_size <<
" bytes, ";
3183 auto perfect_join_ht_cache_size =
3186 auto baseline_join_ht_cache_size =
3189 auto bbox_intersect_ht_cache_size =
3193 auto bbox_intersect_ht_tuner_cache_size =
3197 auto sum_hash_table_cache_size =
3198 perfect_join_ht_cache_size + baseline_join_ht_cache_size +
3199 bbox_intersect_ht_cache_size + bbox_intersect_ht_tuner_cache_size;
3200 oss <<
"\"hash_tables\": " << sum_hash_table_cache_size <<
" bytes, ";
3203 auto chunk_metadata_cache_size =
3204 executor->getResultSetRecyclerHolder()
3205 .getChunkMetadataRecycler()
3208 oss <<
"\"chunk_metadata\": " << chunk_metadata_cache_size <<
" bytes, ";
3211 auto query_plan_dag_cache_size =
3212 executor->getQueryPlanDagCache().getCurrentNodeMapSize();
3213 oss <<
"\"query_plan_dag\": " << query_plan_dag_cache_size <<
" bytes, ";
3216 oss <<
"\"compiled_GPU code\": "
3229 std::ostringstream oss;
3238 const TSessionId& session_id,
3240 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3244 const std::string& table_name) {
3245 auto geo_col_idx = col_idx - 1;
3246 const auto wkt_or_wkb_hex_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
3247 std::vector<std::vector<double>> coords_column, bounds_column;
3248 std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
3250 const bool validate_with_geos_if_available =
false;
3251 if (num_rows != wkt_or_wkb_hex_column->size() ||
3258 validate_with_geos_if_available)) {
3259 std::ostringstream oss;
3260 oss <<
"Invalid geometry in column " << cd->
columnName;
3276 const TSessionId& session_id,
3278 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3279 const std::list<const ColumnDescriptor*>& cds,
3280 const std::vector<int>& desc_id_to_column_id,
3282 const std::string& table_name) {
3283 size_t skip_physical_cols = 0;
3284 size_t col_idx = 0, import_idx = 0;
3285 for (
const auto& cd : cds) {
3286 if (skip_physical_cols > 0) {
3287 CHECK(cd->isGeoPhyCol);
3288 skip_physical_cols--;
3290 }
else if (cd->columnType.is_geometry()) {
3291 skip_physical_cols = cd->columnType.get_physical_cols();
3293 if (desc_id_to_column_id[import_idx] == -1) {
3294 import_buffers[col_idx]->addDefaultValues(cd, num_rows);
3296 if (cd->columnType.is_geometry()) {
3298 session_id, catalog, import_buffers, cd, col_idx, num_rows, table_name);
3302 col_idx += skip_physical_cols;
3309 std::string
get_load_tag(
const std::string& load_tag,
const std::string& table_name) {
3310 std::ostringstream oss;
3311 oss << load_tag <<
"(" << table_name <<
")";
3316 const std::string& table_name,
3317 const std::string& file_path) {
3318 std::ostringstream oss;
3319 oss << import_tag <<
"(" << table_name <<
", file_path:" << file_path <<
")";
3325 const std::string& table_name,
3326 const std::vector<TRow>&
rows,
3327 const std::vector<std::string>& column_names) {
3334 auto session_ptr = stdlog.getConstSessionInfo();
3341 std::unique_ptr<import_export::Loader> loader;
3342 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3345 rows.front().cols.size(),
3349 "load_table_binary");
3351 auto col_descs = loader->get_column_descs();
3354 size_t rows_completed = 0;
3355 auto const load_tag =
get_load_tag(
"load_table_binary", table_name);
3357 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3360 for (
auto const& row : rows) {
3363 for (
auto cd : col_descs) {
3364 auto mapped_idx = desc_id_to_column_id[col_idx];
3365 if (mapped_idx != -1) {
3366 import_buffers[col_idx]->add_value(
3367 cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
3372 }
catch (
const std::exception& e) {
3373 for (
size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3374 import_buffers[col_idx_to_pop]->pop_value();
3376 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
3377 <<
". Row discarded, issue at column : " << (col_idx + 1)
3378 <<
" data :" << row;
3382 session_ptr->getCatalog(),
3385 desc_id_to_column_id,
3389 session_ptr->getCatalog(), table_name);
3390 if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
3393 }
catch (
const std::exception& e) {
3398 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
3401 const std::string& table_name,
3403 std::unique_ptr<import_export::Loader>* loader,
3404 std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
3405 const std::vector<std::string>& column_names,
3406 std::string load_type) {
3407 if (num_cols == 0) {
3413 std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
3415 cat, table_name,
true));
3416 const auto td = (*td_with_lock)();
3427 auto col_descs = (*loader)->get_column_descs();
3429 if (column_names.empty()) {
3432 auto geo_physical_cols = std::count_if(
3433 col_descs.begin(), col_descs.end(), [](
auto cd) {
return cd->isGeoPhyCol; });
3434 const auto num_table_cols =
static_cast<size_t>(td->nColumns) - geo_physical_cols -
3435 (td->hasDeletedCol ? 2 : 1);
3436 if (num_cols != num_table_cols) {
3437 throw std::runtime_error(
"Number of columns to load (" +
std::to_string(num_cols) +
3438 ") does not match number of columns in table " +
3442 }
else if (num_cols != column_names.size()) {
3444 "Number of columns specified does not match the "
3445 "number of columns given (" +
3450 return std::move(td_with_lock);
3455 if (!column.nulls.empty()) {
3456 return column.nulls.size();
3461 return column.data.int_col.size() + column.data.arr_col.size() +
3462 column.data.real_col.size() + column.data.str_col.size();
3469 const std::string& table_name,
3470 const std::vector<TColumn>& cols,
3471 const std::vector<std::string>& column_names) {
3477 auto session_ptr = stdlog.getConstSessionInfo();
3480 std::unique_ptr<import_export::Loader> loader;
3481 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3488 "load_table_binary_columnar");
3490 auto desc_id_to_column_id =
3493 size_t import_idx = 0;
3495 auto const load_tag =
get_load_tag(
"load_table_binary_columnar", table_name);
3497 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3501 size_t skip_physical_cols = 0;
3502 for (
auto cd : loader->get_column_descs()) {
3503 if (skip_physical_cols > 0) {
3504 CHECK(cd->isGeoPhyCol);
3505 skip_physical_cols--;
3508 auto mapped_idx = desc_id_to_column_id[import_idx];
3509 if (mapped_idx != -1) {
3510 size_t col_rows = import_buffers[col_idx]->add_values(cd, cols[mapped_idx]);
3511 if (col_rows != num_rows) {
3512 std::ostringstream oss;
3513 oss <<
"load_table_binary_columnar: Inconsistent number of rows in column "
3514 << cd->columnName <<
" , expecting " << num_rows <<
" rows, column "
3515 << col_idx <<
" has " << col_rows <<
" rows";
3521 if (cd->columnType.is_geometry()) {
3523 session_ptr->getCatalog(),
3529 skip_physical_cols = cd->columnType.get_physical_cols();
3533 if (cd->columnType.is_geometry()) {
3534 skip_physical_cols = cd->columnType.get_physical_cols();
3535 col_idx += skip_physical_cols;
3541 }
catch (
const std::exception& e) {
3542 std::ostringstream oss;
3543 oss <<
"load_table_binary_columnar: Input exception thrown: " << e.what()
3544 <<
". Issue at column : " << (col_idx + 1) <<
". Import aborted";
3548 session_ptr->getCatalog(),
3550 loader->get_column_descs(),
3551 desc_id_to_column_id,
3555 session_ptr->getCatalog(), table_name);
3556 if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3563 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3565 ::arrow::Status _s = (s); \
3566 if (UNLIKELY(!_s.ok())) { \
3568 ex.error_msg = _s.ToString(); \
3569 LOG(ERROR) << s.ToString(); \
3580 auto stream_buffer =
3581 std::make_shared<arrow::Buffer>(
reinterpret_cast<const uint8_t*
>(stream.c_str()),
3582 static_cast<int64_t>(stream.size()));
3584 arrow::io::BufferReader buf_reader(stream_buffer);
3585 std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3587 arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3590 std::shared_ptr<arrow::RecordBatch> batch;
3593 if (batch ==
nullptr) {
3596 batches.emplace_back(std::move(batch));
3598 }
catch (
const std::exception& e) {
3599 LOG(
ERROR) <<
"Error parsing Arrow stream: " << e.what() <<
". Import aborted";
3607 const std::string& table_name,
3608 const std::string& arrow_stream,
3609 const bool use_column_names) {
3614 auto session_ptr = stdlog.getConstSessionInfo();
3618 if (batches.size() != 1) {
3622 std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3623 std::unique_ptr<import_export::Loader> loader;
3624 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3625 std::vector<std::string> column_names;
3626 if (use_column_names) {
3627 column_names = batch->schema()->field_names();
3630 auto schema_read_lock =
3633 static_cast<size_t>(batch->num_columns()),
3637 "load_table_binary_arrow");
3639 auto desc_id_to_column_id =
3641 size_t num_rows = 0;
3645 auto const load_tag =
get_load_tag(
"load_table_binary_arrow", table_name);
3647 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3651 for (
auto cd : loader->get_column_descs()) {
3652 if (cd->isGeoPhyCol) {
3660 auto mapped_idx = desc_id_to_column_id[col_idx];
3661 if (mapped_idx != -1) {
3662 auto& array = *batch->column(mapped_idx);
3666 size_t col_id = cd->columnId;
3673 num_rows = import_buffers[col_id - 1]->add_arrow_values(
3674 cd, array,
true, row_slice,
nullptr);
3676 if (cd->columnType.is_geometry()) {
3678 session_ptr->getCatalog(),
3689 }
catch (
const std::exception& e) {
3690 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
3691 <<
". Issue at column : " << (col_idx + 1) <<
". Import aborted";
3697 session_ptr->getCatalog(),
3699 loader->get_column_descs(),
3700 desc_id_to_column_id,
3704 session_ptr->getCatalog(), table_name);
3705 if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3711 const std::string& table_name,
3712 const std::vector<TStringRow>&
rows,
3713 const std::vector<std::string>& column_names) {
3720 auto session_ptr = stdlog.getConstSessionInfo();
3725 auto const load_tag =
get_load_tag(
"load_table", table_name);
3727 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3731 std::unique_ptr<import_export::Loader> loader;
3732 std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3733 auto schema_read_lock =
3736 static_cast<size_t>(rows.front().cols.size()),
3742 auto col_descs = loader->get_column_descs();
3745 size_t rows_completed = 0;
3746 for (
auto const& row : rows) {
3747 size_t import_idx = 0;
3750 size_t skip_physical_cols = 0;
3751 for (
auto cd : col_descs) {
3752 if (skip_physical_cols > 0) {
3753 CHECK(cd->isGeoPhyCol);
3754 skip_physical_cols--;
3757 auto mapped_idx = desc_id_to_column_id[import_idx];
3758 if (mapped_idx != -1) {
3759 import_buffers[col_idx]->add_value(cd,
3760 row.cols[mapped_idx].str_val,
3761 row.cols[mapped_idx].is_null,
3765 if (cd->columnType.is_geometry()) {
3767 skip_physical_cols = cd->columnType.get_physical_cols();
3768 col_idx += skip_physical_cols;
3774 }
catch (
const std::exception& e) {
3775 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
3776 <<
". Row discarded, issue at column : " << (col_idx + 1)
3777 <<
" data :" << row;
3782 if (rows.size() != 0) {
3783 const auto& row = rows[0];
3786 size_t import_idx = 0;
3787 size_t skip_physical_cols = 0;
3788 for (
auto cd : col_descs) {
3789 if (skip_physical_cols > 0) {
3790 skip_physical_cols--;
3793 auto mapped_idx = desc_id_to_column_id[import_idx];
3795 if (cd->columnType.is_geometry()) {
3796 skip_physical_cols = cd->columnType.get_physical_cols();
3797 if (mapped_idx != -1) {
3799 session_ptr->getCatalog(),
3806 col_idx += skip_physical_cols;
3811 }
catch (
const std::exception& e) {
3812 LOG(
ERROR) <<
"Input exception thrown: " << e.what()
3813 <<
". Row discarded, issue at column : " << (col_idx + 1)
3814 <<
" data :" << row;
3819 session_ptr->getCatalog(),
3822 desc_id_to_column_id,
3826 session_ptr->getCatalog(), table_name);
3827 if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3831 }
catch (
const std::exception& e) {
3838 if (str.size() == 2 && str[0] ==
'\\') {
3839 if (str[1] ==
't') {
3841 }
else if (str[1] ==
'n') {
3843 }
else if (str[1] ==
'0') {
3845 }
else if (str[1] ==
'\'') {
3847 }
else if (str[1] ==
'\\') {
3856 switch (cp.has_header) {
3857 case TImportHeaderRow::AUTODETECT:
3860 case TImportHeaderRow::NO_HEADER:
3863 case TImportHeaderRow::HAS_HEADER:
3869 copy_params.
quoted = cp.quoted;
3870 if (cp.delimiter.length() > 0) {
3875 if (cp.null_str.length() > 0) {
3876 copy_params.
null_str = cp.null_str;
3878 if (cp.quote.length() > 0) {
3881 if (cp.escape.length() > 0) {
3884 if (cp.line_delim.length() > 0) {
3887 if (cp.array_delim.length() > 0) {
3890 if (cp.array_begin.length() > 0) {
3893 if (cp.array_end.length() > 0) {
3896 if (cp.threads != 0) {
3897 copy_params.
threads = cp.threads;
3899 if (cp.s3_access_key.length() > 0) {
3902 if (cp.s3_secret_key.length() > 0) {
3905 if (cp.s3_session_token.length() > 0) {
3908 if (cp.s3_region.length() > 0) {
3911 if (cp.s3_endpoint.length() > 0) {
3916 cp.s3_secret_key.length() == 0 && cp.s3_session_token.length() == 0) {
3917 const auto& server_credentials =
3918 Aws::Auth::DefaultAWSCredentialsProviderChain().GetAWSCredentials();
3919 copy_params.
s3_access_key = server_credentials.GetAWSAccessKeyId();
3920 copy_params.
s3_secret_key = server_credentials.GetAWSSecretKey();
3925 switch (cp.source_type) {
3926 case TSourceType::DELIMITED_FILE:
3929 case TSourceType::GEO_FILE:
3932 case TSourceType::PARQUET_FILE:
3933 #ifdef ENABLE_IMPORT_PARQUET
3939 case TSourceType::ODBC:
3941 case TSourceType::RASTER_FILE:
3948 switch (cp.geo_coords_encoding) {
3949 case TEncodingType::GEOINT:
3952 case TEncodingType::NONE:
3960 switch (cp.geo_coords_type) {
3961 case TDatumType::GEOGRAPHY:
3964 case TDatumType::GEOMETRY:
3971 switch (cp.geo_coords_srid) {
3985 switch (cp.raster_point_type) {
3986 case TRasterPointType::NONE:
3989 case TRasterPointType::AUTO:
3992 case TRasterPointType::SMALLINT:
3995 case TRasterPointType::INT:
3998 case TRasterPointType::FLOAT:
4001 case TRasterPointType::DOUBLE:
4004 case TRasterPointType::POINT:
4011 if (cp.raster_scanlines_per_thread < 0) {
4017 switch (cp.raster_point_transform) {
4018 case TRasterPointTransform::NONE:
4021 case TRasterPointTransform::AUTO:
4024 case TRasterPointTransform::FILE:
4027 case TRasterPointTransform::WORLD:
4035 copy_params.
dsn = cp.odbc_dsn;
4039 copy_params.
username = cp.odbc_username;
4040 copy_params.
password = cp.odbc_password;
4050 TCopyParams copy_params;
4052 copy_params.null_str = cp.
null_str;
4055 copy_params.has_header = TImportHeaderRow::AUTODETECT;
4058 copy_params.has_header = TImportHeaderRow::NO_HEADER;
4061 copy_params.has_header = TImportHeaderRow::HAS_HEADER;
4066 copy_params.quoted = cp.
quoted;
4067 copy_params.quote = cp.
quote;
4068 copy_params.escape = cp.
escape;
4073 copy_params.threads = cp.
threads;
4081 copy_params.source_type = TSourceType::DELIMITED_FILE;
4084 copy_params.source_type = TSourceType::GEO_FILE;
4087 copy_params.source_type = TSourceType::PARQUET_FILE;
4090 copy_params.source_type = TSourceType::RASTER_FILE;
4093 copy_params.source_type = TSourceType::ODBC;
4100 copy_params.geo_coords_encoding = TEncodingType::GEOINT;
4103 copy_params.geo_coords_encoding = TEncodingType::NONE;
4109 copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
4112 copy_params.geo_coords_type = TDatumType::GEOMETRY;
4120 copy_params.geo_assign_render_groups =
false;
4125 copy_params.raster_point_type = TRasterPointType::NONE;
4128 copy_params.raster_point_type = TRasterPointType::AUTO;
4131 copy_params.raster_point_type = TRasterPointType::SMALLINT;
4134 copy_params.raster_point_type = TRasterPointType::INT;
4137 copy_params.raster_point_type = TRasterPointType::FLOAT;
4140 copy_params.raster_point_type = TRasterPointType::DOUBLE;
4143 copy_params.raster_point_type = TRasterPointType::POINT;
4152 copy_params.raster_point_transform = TRasterPointTransform::NONE;
4155 copy_params.raster_point_transform = TRasterPointTransform::AUTO;
4158 copy_params.raster_point_transform = TRasterPointTransform::FILE;
4161 copy_params.raster_point_transform = TRasterPointTransform::WORLD;
4168 copy_params.odbc_dsn = cp.
dsn;
4172 copy_params.odbc_username = cp.
username;
4173 copy_params.odbc_password = cp.
password;
4188 if (boost::istarts_with(path,
"http://") || boost::istarts_with(path,
"https://")) {
4189 if (!gdal_network) {
4191 "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
4194 path =
"/vsicurl/" + path;
4195 }
else if (boost::istarts_with(path,
"s3://")) {
4196 if (!gdal_network) {
4198 "S3 geo file import not supported! Update to GDAL 2.2 or later!");
4201 boost::replace_first(path,
"s3://",
"/vsis3/");
4207 if (boost::iends_with(path,
".gz") && !boost::iends_with(path,
".tar.gz")) {
4208 path =
"/vsigzip/" + path;
4214 if (boost::iends_with(path,
".zip")) {
4216 path =
"/vsizip/" + path;
4217 }
else if (boost::iends_with(path,
".tar") || boost::iends_with(path,
".tgz") ||
4218 boost::iends_with(path,
".tar.gz")) {
4220 path =
"/vsitar/" + path;
4225 std::string path(path_in);
4228 if (boost::istarts_with(path,
"/vsizip/")) {
4229 boost::replace_first(path,
"/vsizip/",
"");
4230 }
else if (boost::istarts_with(path,
"/vsitar/")) {
4231 boost::replace_first(path,
"/vsitar/",
"");
4232 }
else if (boost::istarts_with(path,
"/vsigzip/")) {
4233 boost::replace_first(path,
"/vsigzip/",
"");
4237 if (boost::istarts_with(path,
"/vsicurl/")) {
4238 boost::replace_first(path,
"/vsicurl/",
"");
4239 }
else if (boost::istarts_with(path,
"/vsis3/")) {
4240 boost::replace_first(path,
"/vsis3/",
"s3://");
4247 if (boost::istarts_with(path,
"s3://") || boost::istarts_with(path,
"http://") ||
4248 boost::istarts_with(path,
"https://")) {
4251 return !boost::filesystem::path(path).is_absolute();
4255 auto filename = boost::filesystem::path(path).filename().string();
4269 if (boost::iends_with(path,
".shp") || boost::iends_with(path,
".geojson") ||
4270 boost::iends_with(path,
".json") || boost::iends_with(path,
".kml") ||
4271 boost::iends_with(path,
".kmz") || boost::iends_with(path,
".gdb") ||
4272 boost::iends_with(path,
".gdb.zip") || boost::iends_with(path,
".fgb")) {
4282 if (boost::iends_with(path,
".zip") && !boost::iends_with(path,
".gdb.zip")) {
4284 }
else if (boost::iends_with(path,
".tar") || boost::iends_with(path,
".tgz") ||
4285 boost::iends_with(path,
".tar.gz")) {
4294 std::vector<std::string> files =
4298 LOG(
INFO) <<
"Found " << files.size() <<
" files in Archive "
4300 for (
const auto& file : files) {
4305 bool found_suitable_file =
false;
4306 std::string file_name;
4307 for (
const auto& file : files) {
4310 found_suitable_file =
true;
4316 if (!found_suitable_file) {
4317 LOG(
INFO) <<
"Failed to find any supported geo files in Archive: " +
4327 return (!boost::istarts_with(file_path,
"s3://") &&
4328 !boost::istarts_with(file_path,
"http://") &&
4329 !boost::istarts_with(file_path,
"https://"));
4341 const TSessionId& session_id_or_json,
4342 const std::string& file_name_in,
4343 const TCopyParams& cp) {
4350 bool is_raster =
false;
4351 boost::filesystem::path file_path;
4354 std::string file_name{file_name_in};
4358 picosha2::hash256_hex_string(request_info.
sessionId()) /
4359 boost::filesystem::path(file_name).filename();
4360 file_name = temp_file_path.string();
4373 CHECK(!file_paths.empty());
4374 file_name = file_paths[0];
4389 if (geo_file.size()) {
4390 file_name = file_name + std::string(
"/") + geo_file;
4404 file_path = boost::filesystem::path(file_name);
4406 if (!boost::istarts_with(file_name,
"s3://")) {
4407 if (!boost::filesystem::path(file_name).is_absolute()) {
4409 picosha2::hash256_hex_string(request_info.
sessionId()) /
4410 boost::filesystem::path(file_name).filename();
4411 file_name = file_path.string();
4419 "\" does not exist.")
4425 "\" does not exist.");
4433 #ifdef ENABLE_IMPORT_PARQUET
4439 std::vector<std::string> headers = detector.
get_headers();
4443 _return.row_set.row_desc.resize(best_types.size());
4444 for (
size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
4446 auto& ti = best_types[col_idx];
4447 col.col_type.precision = ti.get_precision();
4448 col.col_type.scale = ti.get_scale();
4449 col.col_type.comp_param = ti.get_comp_param();
4450 if (ti.is_geometry()) {
4454 col.col_type.precision =
static_cast<int>(copy_params.
geo_coords_type);
4460 if (ti.is_array()) {
4461 col.col_type.is_array =
true;
4466 col.col_name = headers[col_idx];
4469 _return.row_set.row_desc[col_idx] = col;
4474 for (
auto row : sample_data) {
4475 sample_row.cols.clear();
4476 for (
const auto& s : row) {
4479 td.is_null = s.empty();
4480 sample_row.cols.push_back(td);
4482 _return.row_set.rows.push_back(sample_row);
4489 for (
auto cd : cds) {
4497 std::map<std::string, std::vector<std::string>> sample_data;
4503 if (sample_data.size() > 0) {
4504 for (
size_t i = 0; i < sample_data.begin()->second.size(); i++) {
4506 for (
auto cd : cds) {
4508 td.val.str_val = sample_data[cd.sourceName].at(i);
4509 td.is_null = td.val.str_val.empty();
4510 sample_row.cols.push_back(td);
4512 _return.row_set.rows.push_back(sample_row);
4518 }
catch (
const std::exception& e) {
4524 const TSessionId& session_id_or_json,
4525 const int64_t widget_id,
4526 const std::string& vega_json,
4527 const int compression_level,
4528 const std::string& nonce) {
4534 "compression_level",
4541 stdlog.appendNameValuePairs(
"nonce", nonce);
4549 auto& non_const_vega_json =
const_cast<std::string&
>(vega_json);
4554 stdlog.getSessionInfo(),
4556 std::move(non_const_vega_json),
4559 }
catch (std::exception& e) {
4566 int32_t dashboard_id,
4571 object.loadKey(catalog);
4572 object.setPrivileges(requestedPermissions);
4573 std::vector<DBObject> privs = {
object};
4583 const TCustomExpression& t_custom_expr,
4585 if (t_custom_expr.data_source_name.empty()) {
4588 CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4589 <<
"Unexpected data source type: "
4590 <<
static_cast<int>(t_custom_expr.data_source_type);
4594 t_custom_expr.data_source_name +
"\" that does not exist.")
4597 return std::make_unique<CustomExpression>(
4598 t_custom_expr.name, t_custom_expr.expression_json, data_source_type, td->tableId);
4603 TCustomExpression t_custom_expr;
4604 t_custom_expr.id = custom_expr.
id;
4605 t_custom_expr.name = custom_expr.
name;
4608 t_custom_expr.is_deleted = custom_expr.
is_deleted;
4610 <<
"Unexpected data source type: "
4612 t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
4615 t_custom_expr.data_source_name = td->
tableName;
4618 <<
"Custom expression references a deleted data source. Custom expression id: "
4619 << custom_expr.
id <<
", name: " << custom_expr.
name;
4621 return t_custom_expr;
4626 const TCustomExpression& t_custom_expr) {
4633 auto session_ptr = stdlog.getConstSessionInfo();
4634 if (!session_ptr->get_currentUser().isSuper) {
4637 auto& catalog = session_ptr->getCatalog();
4639 return catalog.createCustomExpression(
4644 const TSessionId& session_id_or_json) {
4650 auto session_ptr = stdlog.getConstSessionInfo();
4651 auto& catalog = session_ptr->getCatalog();
4653 auto custom_expressions =
4654 catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
4655 for (
const auto& custom_expression : custom_expressions) {
4662 const std::string& expression_json) {
4669 auto session_ptr = stdlog.getConstSessionInfo();
4670 if (!session_ptr->get_currentUser().isSuper) {
4673 auto& catalog = session_ptr->getCatalog();
4675 catalog.updateCustomExpression(
id, expression_json);
4679 const TSessionId& session_id_or_json,
4680 const std::vector<int32_t>& custom_expression_ids,
4681 const bool do_soft_delete) {
4688 auto session_ptr = stdlog.getConstSessionInfo();
4689 if (!session_ptr->get_currentUser().isSuper) {
4692 auto& catalog = session_ptr->getCatalog();
4694 catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4699 const TSessionId& session_id_or_json,
4700 const int32_t dashboard_id) {
4705 auto session_ptr = stdlog.getConstSessionInfo();
4706 auto const&
cat = session_ptr->getCatalog();
4708 auto dash =
cat.getMetadataForDashboard(dashboard_id);
4724 const TSessionId& session_id_or_json) {
4729 auto session_ptr = stdlog.getConstSessionInfo();
4730 auto const&
cat = session_ptr->getCatalog();
4732 const auto dashes =
cat.getAllDashboardsMetadata();
4734 for (
const auto dash : dashes) {
4746 const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4749 const bool populate_state) {
4750 auto const&
cat = session_ptr->getCatalog();
4753 cat.getCurrentDB().dbId,
4756 TDashboard dashboard;
4758 if (populate_state) {
4765 dashboard.dashboard_owner = dash->
user;
4766 TDashboardPermissions perms;
4768 if (session_ptr->get_currentUser().isSuper) {
4769 perms.create_ =
true;
4770 perms.delete_ =
true;
4778 obj_to_find.loadKey(
cat);
4779 std::vector<std::string> grantees =
4781 session_ptr->get_currentUser().isSuper,
4782 session_ptr->get_currentUser().userName);
4783 for (
const auto& grantee : grantees) {
4786 if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(),
true))) {
4795 dashboard.dashboard_permissions = perms;
4796 if (objects_list.empty() ||
4797 (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.
userName)) {
4798 dashboard.is_dash_shared =
false;
4800 dashboard.is_dash_shared =
true;
4805 namespace dbhandler {
4813 std::string error_message{
"Write requests/queries are not allowed in the " +
4815 if (throw_db_exception) {
4818 throw std::runtime_error(error_message);
4825 const std::string& dashboard_name,
4826 const std::string& dashboard_state,
4827 const std::string& image_hash,
4828 const std::string& dashboard_metadata) {
4833 auto session_ptr = stdlog.getConstSessionInfo();
4836 auto&
cat = session_ptr->getCatalog();
4855 dd.
userId = session_ptr->get_currentUser().userId;
4856 dd.
user = session_ptr->get_currentUser().userName;
4859 auto id =
cat.createDashboard(dd);
4864 }
catch (
const std::exception& e) {
4870 const int32_t dashboard_id,
4871 const std::string& dashboard_name,
4872 const std::string& dashboard_owner,
4873 const std::string& dashboard_state,
4874 const std::string& image_hash,
4875 const std::string& dashboard_metadata) {
4880 auto session_ptr = stdlog.getConstSessionInfo();
4883 auto&
cat = session_ptr->getCatalog();
4893 if (
auto dash =
cat.getMetadataForDashboard(
4894 std::to_string(session_ptr->get_currentUser().userId), dashboard_name)) {
4895 if (dash->dashboardId != dashboard_id) {
4911 dd.
user = dashboard_owner;
4915 cat.replaceDashboard(dd);
4916 }
catch (
const std::exception& e) {
4922 const int32_t dashboard_id) {
4927 const std::vector<int32_t>& dashboard_ids) {
4932 auto session_ptr = stdlog.getConstSessionInfo();
4934 auto&
cat = session_ptr->getCatalog();
4940 cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4941 }
catch (
const std::exception& e) {
4947 int32_t dashboard_id,
4948 std::vector<std::string> groups) {
4952 auto&
cat = session_info.getCatalog();
4953 auto dash = cat.getMetadataForDashboard(dashboard_id);
4957 }
else if (session_info.get_currentUser().userId != dash->userId &&
4958 !session_info.get_currentUser().isSuper) {
4959 throw std::runtime_error(
4960 "User should be either owner of dashboard or super user to share/unshare it");
4962 std::vector<std::string> valid_groups;
4964 for (
auto& group : groups) {
4968 }
else if (!user_meta.
isSuper) {
4969 valid_groups.push_back(group);
4972 return valid_groups;
4976 for (
auto const& group : groups) {
4985 const std::vector<int32_t>& dashboard_ids) {
4987 std::map<std::string, std::list<int32_t>> errors;
4988 for (
auto const& dashboard_id : dashboard_ids) {
4989 auto dashboard =
cat.getMetadataForDashboard(dashboard_id);
4991 errors[
"Dashboard id does not exist"].push_back(dashboard_id);
4994 errors[
"User should be either owner of dashboard or super user to share/unshare it"]
4995 .push_back(dashboard_id);
4998 if (!errors.empty()) {
4999 std::stringstream error_stream;
5000 error_stream <<
"Share/Unshare dashboard(s) failed with error(s)\n";
5001 for (
const auto& [error, id_list] : errors) {
5002 error_stream <<
"Dashboard ids " <<
join(id_list,
", ") <<
": " << error <<
"\n";
5009 const std::vector<int32_t>& dashboard_ids,
5010 const std::vector<std::string>& groups,
5011 const TDashboardPermissions& permissions,
5012 const bool do_share) {
5015 check_read_only(do_share ?
"share_dashboards" :
"unshare_dashboards");
5016 if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
5017 !permissions.view_) {
5019 std::string(do_share ?
"grants" :
"revokes"));
5021 auto session_ptr = stdlog.getConstSessionInfo();
5022 auto const& catalog = session_ptr->getCatalog();
5026 std::vector<DBObject> batch_objects;
5027 for (
auto const& dashboard_id : dashboard_ids) {
5030 if (permissions.delete_) {
5033 if (permissions.create_) {
5036 if (permissions.edit_) {
5039 if (permissions.view_) {
5042 object.setPrivileges(privs);
5043 batch_objects.push_back(
object);
5046 sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
5048 sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
5053 const std::vector<int32_t>& dashboard_ids,
5054 const std::vector<std::string>& groups,
5055 const TDashboardPermissions& permissions) {
5059 request_info.
sessionId(), dashboard_ids, groups, permissions,
true);
5064 const int32_t dashboard_id,
5065 const std::vector<std::string>& groups,
5066 const std::vector<std::string>& objects,
5067 const TDashboardPermissions& permissions,
5068 const bool grant_role =
false) {
5073 const std::vector<int32_t>& dashboard_ids,
5074 const std::vector<std::string>& groups,
5075 const TDashboardPermissions& permissions) {
5079 request_info.
sessionId(), dashboard_ids, groups, permissions,
false);
5083 const int32_t dashboard_id,
5084 const std::vector<std::string>& groups,
5085 const std::vector<std::string>& objects,
5086 const TDashboardPermissions& permissions) {
5091 std::vector<TDashboardGrantees>& dashboard_grantees,
5092 const TSessionId& session_id_or_json,
5093 const int32_t dashboard_id) {
5098 auto session_ptr = stdlog.getConstSessionInfo();
5099 auto const&
cat = session_ptr->getCatalog();
5101 auto dash =
cat.getMetadataForDashboard(dashboard_id);
5105 }
else if (session_ptr->get_currentUser().userId != dash->userId &&
5106 !session_ptr->get_currentUser().isSuper) {
5108 "User should be either owner of dashboard or super user to access grantees");
5110 std::vector<ObjectRoleDescriptor*> objectsList;
5112 cat.getCurrentDB().dbId,
5118 for (
auto object : objectsList) {
5119 if (user_meta.
userName == object->roleName) {
5123 TDashboardGrantees grantee;
5124 TDashboardPermissions perm;
5125 grantee.name =
object->roleName;
5126 grantee.is_user =
object->roleType;
5131 grantee.permissions = perm;
5132 dashboard_grantees.push_back(grantee);
5137 const TSessionId& session_id_or_json,
5138 const std::string& view_state,
5139 const std::string& view_metadata) {
5144 auto session_ptr = stdlog.getConstSessionInfo();
5146 auto&
cat = session_ptr->getCatalog();
5149 ld.
userId = session_ptr->get_currentUser().userId;
5154 _return =
cat.createLink(ld, 6);
5155 }
catch (
const std::exception& e) {
5161 const std::string&
name,
5162 const bool is_array) {
5165 ct.col_type.type =
type;
5166 ct.col_type.is_array = is_array;
5172 const std::list<std::string> shp_ext{
".shp",
".shx",
".dbf"};
5173 if (std::find(shp_ext.begin(),
5175 boost::algorithm::to_lower_copy(file_path.extension().string())) !=
5177 for (
auto ext : shp_ext) {
5178 auto aux_file = file_path;
5180 aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
5183 aux_file.replace_extension(ext).string(), copy_params)) {
5184 throw std::runtime_error(
"required file for shapefile does not exist: " +
5185 aux_file.filename().string());
5192 const std::string& table_name,
5193 const TRowDescriptor& rd,
5194 const TCreateParams& create_params) {
5197 auto stdlog =
STDLOG(
"table_name", table_name);
5209 std::string stmt{
"CREATE TABLE " + table_name};
5210 std::vector<std::string> col_stmts;
5212 for (
auto col : rds) {
5218 if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
5219 col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
5221 " for column: " + col.col_name);
5224 if (col.col_type.type == TDatumType::DECIMAL) {
5226 if (col.col_type.precision == 0 && col.col_type.scale == 0) {
5227 col.col_type.precision = 14;
5228 col.col_type.scale = 7;
5232 std::string col_stmt;
5233 col_stmt.append(col.col_name +
" " +
thrift_to_name(col.col_type));
5234 if (col.__isset.default_value) {
5235 col_stmt.append(
" DEFAULT " + col.default_value);
5248 col_stmt.append(
"(" +
std::to_string(col.col_type.comp_param) +
")");
5250 }
else if (col.col_type.type == TDatumType::STR) {
5252 col_stmt.append(
" ENCODING NONE");
5253 }
else if (col.col_type.type == TDatumType::POINT ||
5254 col.col_type.type == TDatumType::MULTIPOINT ||
5255 col.col_type.type == TDatumType::LINESTRING ||
5256 col.col_type.type == TDatumType::MULTILINESTRING ||
5257 col.col_type.type == TDatumType::POLYGON ||
5258 col.col_type.type == TDatumType::MULTIPOLYGON) {
5260 if (col.col_type.scale == 4326) {
5261 col_stmt.append(
" ENCODING NONE");
5264 col_stmts.push_back(col_stmt);
5269 if (create_params.is_replicated) {
5270 stmt.append(
" WITH (PARTITIONS = 'REPLICATED')");
5281 const std::string& table_name,
5282 const std::string& file_name_in,
5283 const TCopyParams& cp) {
5290 auto session_ptr = stdlog.getConstSessionInfo();
5292 LOG(
INFO) <<
"import_table " << table_name <<
" from " << file_name_in;
5295 auto&
cat = session_ptr->getCatalog();
5299 executor->enrollQuerySession(request_info.
sessionId(),
5303 QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5309 executor->clearQuerySessionStatus(request_info.sessionId(),
start_time);
5312 const auto td_with_lock =
5315 const auto td = td_with_lock();
5319 std::string copy_from_source;
5324 std::string file_name{file_name_in};
5325 auto file_path = boost::filesystem::path(file_name);
5326 if (!boost::istarts_with(file_name,
"s3://")) {
5327 if (!boost::filesystem::path(file_name).is_absolute()) {
5329 picosha2::hash256_hex_string(request_info.sessionId()) /
5330 boost::filesystem::path(file_name).filename();
5331 file_name = file_path.string();
5335 "\" does not exist.");
5343 if (boost::filesystem::path(file_path).extension() ==
".tsv") {
5347 copy_from_source = file_path.string();
5349 auto const load_tag =
get_import_tag(
"import_table", table_name, copy_from_source);
5351 ScopeGuard cleanup = [&load_tag, &session_ptr]() {
5355 session_ptr->getCatalog(), table_name);
5356 std::unique_ptr<import_export::AbstractImporter> importer;
5359 LOG(
INFO) <<
"Total Import Time: " << (double)ms / 1000.0 <<
" Seconds.";
5360 }
catch (
const TDBException& e) {
5362 }
catch (
const std::exception& e) {