OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Parser::CopyTableStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::CopyTableStmt:
+ Collaboration diagram for Parser::CopyTableStmt:

Public Member Functions

 CopyTableStmt (std::string *t, std::string *f, std::list< NameValueAssign * > *o)
 
 CopyTableStmt (const rapidjson::Value &payload)
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode, const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &importer_factory)
 
std::string & get_table () const
 
bool get_success () const
 
bool was_deferred_copy_from () const
 
void get_deferred_copy_from_payload (std::string &table, std::string &file_name, import_export::CopyParams &copy_params, std::string &partitions)
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

std::unique_ptr< std::string > return_message
 

Private Attributes

std::unique_ptr< std::string > table_
 
std::unique_ptr< std::string > copy_from_source_pattern_
 
bool success_
 
std::list< std::unique_ptr
< NameValueAssign > > 
options_
 
bool was_deferred_copy_from_ = false
 
std::string deferred_copy_from_file_name_
 
import_export::CopyParams deferred_copy_from_copy_params_
 
std::string deferred_copy_from_partitions_
 

Detailed Description

Definition at line 1517 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::CopyTableStmt::CopyTableStmt ( std::string *  t,
std::string *  f,
std::list< NameValueAssign * > *  o 
)

Definition at line 5316 of file ParserNode.cpp.

References options_.

5319  : table_(t), copy_from_source_pattern_(f), success_(true) {
5320  if (o) {
5321  for (const auto e : *o) {
5322  options_.emplace_back(e);
5323  }
5324  delete o;
5325  }
5326 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1555
constexpr double f
Definition: Utm.h:31
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1557
Parser::CopyTableStmt::CopyTableStmt ( const rapidjson::Value &  payload)

Definition at line 5328 of file ParserNode.cpp.

References CHECK, copy_from_source_pattern_, json_str(), options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_options(), and table_.

5328  : success_(true) {
5329  CHECK(payload.HasMember("table"));
5330  table_ = std::make_unique<std::string>(json_str(payload["table"]));
5331 
5332  CHECK(payload.HasMember("filePath"));
5333  std::string fs = json_str(payload["filePath"]);
5334  // strip leading/trailing spaces/quotes/single quotes
5335  boost::algorithm::trim_if(fs, boost::is_any_of(" \"'`"));
5336  copy_from_source_pattern_ = std::make_unique<std::string>(fs);
5337 
5338  parse_options(payload, options_);
5339 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1555
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:44
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
void parse_options(const rapidjson::Value &payload, std::list< std::unique_ptr< NameValueAssign >> &nameValueList, bool stringToNull=false, bool stringToInteger=false)
#define CHECK(condition)
Definition: Logger.h:222
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1557

+ Here is the call graph for this function:

Member Function Documentation

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode 
)
overridevirtual

Implements Parser::DDLStmt.

Definition at line 5341 of file ParserNode.cpp.

References import_export::create_importer().

Referenced by heavydb.cursor.Cursor::executemany(), and QueryRunner::QueryRunner::runImport().

5342  {
5343  if (read_only_mode) {
5344  throw std::runtime_error("IMPORT invalid in read only mode.");
5345  }
5346  auto importer_factory = [](Catalog_Namespace::Catalog& catalog,
5347  const TableDescriptor* td,
5348  const std::string& copy_from_source,
5349  const import_export::CopyParams& copy_params)
5350  -> std::unique_ptr<import_export::AbstractImporter> {
5351  return import_export::create_importer(catalog, td, copy_from_source, copy_params);
5352  };
5353  return execute(session, read_only_mode, importer_factory);
5354 }
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string &copy_from_source, const import_export::CopyParams &copy_params)
Definition: Importer.cpp:6279
void execute(const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode,
const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &  importer_factory 
)

Definition at line 5356 of file ParserNode.cpp.

References CHECK, Executor::clearExternalCaches(), copy_from_source_pattern_, Catalog_Namespace::DBMetadata::dbId, deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, logger::ERROR, measure< TimeT >::execution(), legacylockmgr::ExecutorOuterLock, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_currentUser(), Catalog_Namespace::SessionInfo::get_session_id(), Catalog_Namespace::SessionInfo::getCatalog(), Catalog_Namespace::Catalog::getCurrentDB(), Executor::getExecutor(), legacylockmgr::LockMgr< MutexType, KeyType >::getMutex(), lockmgr::TableLockMgrImpl< T >::getWriteLockForTable(), ddl_utils::IMPORT, logger::INFO, AccessPrivileges::INSERT_INTO_TABLE, import_export::kGeoFile, import_export::kOdbc, import_export::kRasterFile, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, DBObject::loadKey(), LOG, import_export::CopyParams::max_reject, options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_copy_params(), return_message, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, DBObject::setPrivileges(), import_export::CopyParams::source_type, import_export::CopyParams::sql_order_by, import_export::CopyParams::sql_select, run_benchmark_import::start_time, success_, table_, TableDBObjectType, TableDescriptor::tableName, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, Catalog_Namespace::UserMetadata::userLoggable(), ddl_utils::validate_allowed_file_path(), and was_deferred_copy_from_.

Referenced by heavydb.cursor.Cursor::executemany().

5363  {
5364  if (read_only_mode) {
5365  throw std::runtime_error("COPY FROM invalid in read only mode.");
5366  }
5367 
5368  size_t total_time = 0;
5369 
5370  // Prevent simultaneous import / truncate (see TruncateTableStmt::execute)
5371  const auto execute_read_lock =
5375 
5376  const TableDescriptor* td{nullptr};
5377  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5378  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5379 
5380  auto& catalog = session.getCatalog();
5381 
5382  try {
5383  td_with_lock = std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5385  catalog, *table_));
5386  td = (*td_with_lock)();
5387  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5389  } catch (const std::runtime_error& e) {
5390  // noop
5391  // TODO(adb): We're really only interested in whether the table exists or not.
5392  // Create a more refined exception.
5393  }
5394 
5395  // if the table already exists, it's locked, so check access privileges
5396  if (td) {
5397  std::vector<DBObject> privObjects;
5398  DBObject dbObject(*table_, TableDBObjectType);
5399  dbObject.loadKey(catalog);
5400  dbObject.setPrivileges(AccessPrivileges::INSERT_INTO_TABLE);
5401  privObjects.push_back(dbObject);
5402  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
5403  throw std::runtime_error("Violation of access privileges: user " +
5404  session.get_currentUser().userLoggable() +
5405  " has no insert privileges for table " + *table_ + ".");
5406  }
5407 
5408  // invalidate cached item
5409  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
5410  }
5411 
5412  import_export::CopyParams copy_params;
5413  std::vector<std::string> warnings;
5415 
5416  boost::regex non_local_file_regex{R"(^\s*(s3|http|https)://.+)",
5417  boost::regex::extended | boost::regex::icase};
5418  if (!boost::regex_match(*copy_from_source_pattern_, non_local_file_regex) &&
5422  }
5423  // since we'll have not only posix file names but also s3/hdfs/... url
5424  // we do not expand wildcard or check file existence here.
5425  // from here on, copy_from_source contains something which may be a url
5426  // a wildcard of file names, or a sql select statement;
5427  std::string copy_from_source = *copy_from_source_pattern_;
5428 
5429  if (copy_params.source_type == import_export::SourceType::kOdbc) {
5430  copy_params.sql_select = copy_from_source;
5431  if (copy_params.sql_order_by.empty()) {
5432  throw std::runtime_error(
5433  "Option \"SQL ORDER BY\" must be specified when copying from an ODBC source.");
5434  }
5435  }
5436 
5437  std::string tr;
5438 
5439  for (auto const& warning : warnings) {
5440  tr += warning + "\n";
5441  }
5442 
5443  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
5445  // geo import
5446  // we do nothing here, except stash the parameters so we can
5447  // do the import when we unwind to the top of the handler
5448  deferred_copy_from_file_name_ = copy_from_source;
5449  deferred_copy_from_copy_params_ = copy_params;
5450  was_deferred_copy_from_ = true;
5451 
5452  // the result string
5453  // @TODO simon.eves put something more useful in here
5454  // except we really can't because we haven't done the import yet!
5455  if (td) {
5456  tr += std::string("Appending geo to table '") + *table_ + std::string("'...");
5457  } else {
5458  tr += std::string("Creating table '") + *table_ +
5459  std::string("' and importing geo...");
5460  }
5461  } else {
5462  if (td) {
5463  CHECK(td_with_lock);
5464 
5465  // regular import
5466  auto importer = importer_factory(catalog, td, copy_from_source, copy_params);
5467  auto start_time = ::toString(std::chrono::system_clock::now());
5469  auto query_session = session.get_session_id();
5470  auto query_str = "COPYING " + td->tableName;
5472  executor->enrollQuerySession(query_session,
5473  query_str,
5474  start_time,
5476  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5477  }
5478 
5479  ScopeGuard clearInterruptStatus =
5480  [executor, &query_str, &query_session, &start_time, &importer] {
5481  // reset the runtime query interrupt status
5483  executor->clearQuerySessionStatus(query_session, start_time);
5484  }
5485  };
5486  import_export::ImportStatus import_result;
5487  auto ms =
5488  measure<>::execution([&]() { import_result = importer->import(&session); });
5489  total_time += ms;
5490  // results
5491  if (!import_result.load_failed &&
5492  import_result.rows_rejected > copy_params.max_reject) {
5493  LOG(ERROR) << "COPY exited early due to reject records count during multi file "
5494  "processing ";
5495  // if we have crossed the truncated load threshold
5496  import_result.load_failed = true;
5497  import_result.load_msg =
5498  "COPY exited early due to reject records count during multi file "
5499  "processing ";
5500  success_ = false;
5501  }
5502  if (!import_result.load_failed) {
5503  tr += std::string(
5504  "Loaded: " + std::to_string(import_result.rows_completed) +
5505  " recs, Rejected: " + std::to_string(import_result.rows_rejected) +
5506  " recs in " + std::to_string((double)total_time / 1000.0) + " secs");
5507  } else {
5508  tr += std::string("Loader Failed due to : " + import_result.load_msg + " in " +
5509  std::to_string((double)total_time / 1000.0) + " secs");
5510  }
5511  } else {
5512  throw std::runtime_error("Table '" + *table_ + "' must exist before COPY FROM");
5513  }
5514  }
5515  return_message.reset(new std::string(tr));
5516  LOG(INFO) << tr;
5517 }
static std::shared_ptr< WrapperType< MutexType > > getMutex(const LockType lockType, const KeyType &key)
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1555
#define LOG(tag)
Definition: Logger.h:216
std::unique_ptr< std::string > return_message
Definition: ParserNode.h:1531
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1562
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::string to_string(char const *&&v)
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:468
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1560
std::string sql_order_by
Definition: CopyParams.h:96
import_export::SourceType source_type
Definition: CopyParams.h:57
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1448
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:770
void parse_copy_params(const std::list< std::unique_ptr< NameValueAssign >> &options_, import_export::CopyParams &copy_params, std::vector< std::string > &warnings, std::string &deferred_copy_from_partitions_)
std::string get_session_id() const
Definition: SessionInfo.h:76
Catalog & getCatalog() const
Definition: SessionInfo.h:65
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1561
#define CHECK(condition)
Definition: Logger.h:222
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1557
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:391
std::string userLoggable() const
Definition: SysCatalog.cpp:127
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:71
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::get_deferred_copy_from_payload ( std::string &  table,
std::string &  file_name,
import_export::CopyParams copy_params,
std::string &  partitions 
)
inline

Definition at line 1542 of file ParserNode.h.

References deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, table_, and was_deferred_copy_from_.

1545  {
1546  table = *table_;
1547  file_name = deferred_copy_from_file_name_;
1548  copy_params = deferred_copy_from_copy_params_;
1549  partitions = deferred_copy_from_partitions_;
1550  was_deferred_copy_from_ = false;
1551  }
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1562
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1560
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1561
bool Parser::CopyTableStmt::get_success ( ) const
inline

Definition at line 1538 of file ParserNode.h.

References success_.

1538 { return success_; }
std::string& Parser::CopyTableStmt::get_table ( ) const
inline

Definition at line 1533 of file ParserNode.h.

References CHECK, and table_.

1533  {
1534  CHECK(table_);
1535  return *table_;
1536  }
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
#define CHECK(condition)
Definition: Logger.h:222
bool Parser::CopyTableStmt::was_deferred_copy_from ( ) const
inline

Definition at line 1540 of file ParserNode.h.

References was_deferred_copy_from_.

1540 { return was_deferred_copy_from_; }

Member Data Documentation

std::unique_ptr<std::string> Parser::CopyTableStmt::copy_from_source_pattern_
private

Definition at line 1555 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

import_export::CopyParams Parser::CopyTableStmt::deferred_copy_from_copy_params_
private

Definition at line 1561 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_file_name_
private

Definition at line 1560 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_partitions_
private

Definition at line 1562 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::list<std::unique_ptr<NameValueAssign> > Parser::CopyTableStmt::options_
private

Definition at line 1557 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

std::unique_ptr<std::string> Parser::CopyTableStmt::return_message

Definition at line 1531 of file ParserNode.h.

Referenced by execute().

bool Parser::CopyTableStmt::success_
private

Definition at line 1556 of file ParserNode.h.

Referenced by execute(), and get_success().

std::unique_ptr<std::string> Parser::CopyTableStmt::table_
private

Definition at line 1554 of file ParserNode.h.

Referenced by CopyTableStmt(), execute(), get_deferred_copy_from_payload(), and get_table().

bool Parser::CopyTableStmt::was_deferred_copy_from_ = false
private

Definition at line 1559 of file ParserNode.h.

Referenced by execute(), get_deferred_copy_from_payload(), and was_deferred_copy_from().


The documentation for this class was generated from the following files: