OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Parser::CopyTableStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::CopyTableStmt:
+ Collaboration diagram for Parser::CopyTableStmt:

Public Member Functions

 CopyTableStmt (std::string *t, std::string *f, std::list< NameValueAssign * > *o)
 
 CopyTableStmt (const rapidjson::Value &payload)
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode, const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &importer_factory)
 
std::string & get_table () const
 
bool get_success () const
 
bool was_deferred_copy_from () const
 
void get_deferred_copy_from_payload (std::string &table, std::string &file_name, import_export::CopyParams &copy_params, std::string &partitions)
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

std::unique_ptr< std::string > return_message
 

Private Attributes

std::unique_ptr< std::string > table_
 
std::unique_ptr< std::string > copy_from_source_pattern_
 
bool success_
 
std::list< std::unique_ptr
< NameValueAssign > > 
options_
 
bool was_deferred_copy_from_ = false
 
std::string deferred_copy_from_file_name_
 
import_export::CopyParams deferred_copy_from_copy_params_
 
std::string deferred_copy_from_partitions_
 

Detailed Description

Definition at line 1517 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::CopyTableStmt::CopyTableStmt ( std::string *  t,
std::string *  f,
std::list< NameValueAssign * > *  o 
)

Definition at line 5367 of file ParserNode.cpp.

References options_.

5370  : table_(t), copy_from_source_pattern_(f), success_(true) {
5371  if (o) {
5372  for (const auto e : *o) {
5373  options_.emplace_back(e);
5374  }
5375  delete o;
5376  }
5377 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1555
constexpr double f
Definition: Utm.h:31
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1557
Parser::CopyTableStmt::CopyTableStmt ( const rapidjson::Value &  payload)

Definition at line 5379 of file ParserNode.cpp.

References CHECK, copy_from_source_pattern_, json_str(), options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_options(), and table_.

5379  : success_(true) {
5380  CHECK(payload.HasMember("table"));
5381  table_ = std::make_unique<std::string>(json_str(payload["table"]));
5382 
5383  CHECK(payload.HasMember("filePath"));
5384  std::string fs = json_str(payload["filePath"]);
5385  // strip leading/trailing spaces/quotes/single quotes
5386  boost::algorithm::trim_if(fs, boost::is_any_of(" \"'`"));
5387  copy_from_source_pattern_ = std::make_unique<std::string>(fs);
5388 
5389  parse_options(payload, options_);
5390 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1555
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:44
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
void parse_options(const rapidjson::Value &payload, std::list< std::unique_ptr< NameValueAssign >> &nameValueList, bool stringToNull=false, bool stringToInteger=false)
#define CHECK(condition)
Definition: Logger.h:222
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1557

+ Here is the call graph for this function:

Member Function Documentation

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode 
)
overridevirtual

Implements Parser::DDLStmt.

Definition at line 5392 of file ParserNode.cpp.

References import_export::create_importer().

Referenced by heavydb.cursor.Cursor::executemany(), and QueryRunner::QueryRunner::runImport().

5393  {
5394  if (read_only_mode) {
5395  throw std::runtime_error("IMPORT invalid in read only mode.");
5396  }
5397  auto importer_factory = [](Catalog_Namespace::Catalog& catalog,
5398  const TableDescriptor* td,
5399  const std::string& copy_from_source,
5400  const import_export::CopyParams& copy_params)
5401  -> std::unique_ptr<import_export::AbstractImporter> {
5402  return import_export::create_importer(catalog, td, copy_from_source, copy_params);
5403  };
5404  return execute(session, read_only_mode, importer_factory);
5405 }
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string &copy_from_source, const import_export::CopyParams &copy_params)
Definition: Importer.cpp:6331
void execute(const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode,
const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &  importer_factory 
)

Definition at line 5407 of file ParserNode.cpp.

References CHECK, Executor::clearExternalCaches(), copy_from_source_pattern_, Catalog_Namespace::DBMetadata::dbId, deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, logger::ERROR, measure< TimeT >::execution(), legacylockmgr::ExecutorOuterLock, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_currentUser(), Catalog_Namespace::SessionInfo::get_session_id(), Catalog_Namespace::SessionInfo::getCatalog(), Catalog_Namespace::Catalog::getCurrentDB(), Executor::getExecutor(), legacylockmgr::LockMgr< MutexType, KeyType >::getMutex(), lockmgr::TableLockMgrImpl< T >::getWriteLockForTable(), ddl_utils::IMPORT, logger::INFO, AccessPrivileges::INSERT_INTO_TABLE, import_export::kGeoFile, import_export::kOdbc, import_export::kRasterFile, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, DBObject::loadKey(), LOG, import_export::CopyParams::max_reject, options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_copy_params(), return_message, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, DBObject::setPrivileges(), import_export::CopyParams::source_type, import_export::CopyParams::sql_order_by, import_export::CopyParams::sql_select, run_benchmark_import::start_time, success_, table_, TableDBObjectType, TableDescriptor::tableName, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, Catalog_Namespace::UserMetadata::userLoggable(), ddl_utils::validate_allowed_file_path(), and was_deferred_copy_from_.

Referenced by heavydb.cursor.Cursor::executemany().

5414  {
5415  if (read_only_mode) {
5416  throw std::runtime_error("COPY FROM invalid in read only mode.");
5417  }
5418 
5419  size_t total_time = 0;
5420 
5421  // Prevent simultaneous import / truncate (see TruncateTableStmt::execute)
5422  const auto execute_read_lock =
5426 
5427  const TableDescriptor* td{nullptr};
5428  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5429  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5430 
5431  auto& catalog = session.getCatalog();
5432 
5433  try {
5434  td_with_lock = std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5436  catalog, *table_));
5437  td = (*td_with_lock)();
5438  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5440  } catch (const std::runtime_error& e) {
5441  // noop
5442  // TODO(adb): We're really only interested in whether the table exists or not.
5443  // Create a more refined exception.
5444  }
5445 
5446  // if the table already exists, it's locked, so check access privileges
5447  if (td) {
5448  std::vector<DBObject> privObjects;
5449  DBObject dbObject(*table_, TableDBObjectType);
5450  dbObject.loadKey(catalog);
5451  dbObject.setPrivileges(AccessPrivileges::INSERT_INTO_TABLE);
5452  privObjects.push_back(dbObject);
5453  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
5454  throw std::runtime_error("Violation of access privileges: user " +
5455  session.get_currentUser().userLoggable() +
5456  " has no insert privileges for table " + *table_ + ".");
5457  }
5458 
5459  // invalidate cached item
5460  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
5461  }
5462 
5463  import_export::CopyParams copy_params;
5464  std::vector<std::string> warnings;
5466 
5467  boost::regex non_local_file_regex{R"(^\s*(s3|http|https)://.+)",
5468  boost::regex::extended | boost::regex::icase};
5469  if (!boost::regex_match(*copy_from_source_pattern_, non_local_file_regex) &&
5473  }
5474  // since we'll have not only posix file names but also s3/hdfs/... url
5475  // we do not expand wildcard or check file existence here.
5476  // from here on, copy_from_source contains something which may be a url
5477  // a wildcard of file names, or a sql select statement;
5478  std::string copy_from_source = *copy_from_source_pattern_;
5479 
5480  if (copy_params.source_type == import_export::SourceType::kOdbc) {
5481  copy_params.sql_select = copy_from_source;
5482  if (copy_params.sql_order_by.empty()) {
5483  throw std::runtime_error(
5484  "Option \"SQL ORDER BY\" must be specified when copying from an ODBC source.");
5485  }
5486  }
5487 
5488  std::string tr;
5489 
5490  for (auto const& warning : warnings) {
5491  tr += warning + "\n";
5492  }
5493 
5494  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
5496  // geo import
5497  // we do nothing here, except stash the parameters so we can
5498  // do the import when we unwind to the top of the handler
5499  deferred_copy_from_file_name_ = copy_from_source;
5500  deferred_copy_from_copy_params_ = copy_params;
5501  was_deferred_copy_from_ = true;
5502 
5503  // the result string
5504  // @TODO simon.eves put something more useful in here
5505  // except we really can't because we haven't done the import yet!
5506  if (td) {
5507  tr += std::string("Appending geo to table '") + *table_ + std::string("'...");
5508  } else {
5509  tr += std::string("Creating table '") + *table_ +
5510  std::string("' and importing geo...");
5511  }
5512  } else {
5513  if (td) {
5514  CHECK(td_with_lock);
5515 
5516  // regular import
5517  auto importer = importer_factory(catalog, td, copy_from_source, copy_params);
5518  auto start_time = ::toString(std::chrono::system_clock::now());
5520  auto query_session = session.get_session_id();
5521  auto query_str = "COPYING " + td->tableName;
5523  executor->enrollQuerySession(query_session,
5524  query_str,
5525  start_time,
5527  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5528  }
5529 
5530  ScopeGuard clearInterruptStatus =
5531  [executor, &query_str, &query_session, &start_time, &importer] {
5532  // reset the runtime query interrupt status
5534  executor->clearQuerySessionStatus(query_session, start_time);
5535  }
5536  };
5537  import_export::ImportStatus import_result;
5538  auto ms =
5539  measure<>::execution([&]() { import_result = importer->import(&session); });
5540  total_time += ms;
5541  // results
5542  if (!import_result.load_failed &&
5543  import_result.rows_rejected > copy_params.max_reject) {
5544  LOG(ERROR) << "COPY exited early due to reject records count during multi file "
5545  "processing ";
5546  // if we have crossed the truncated load threshold
5547  import_result.load_failed = true;
5548  import_result.load_msg =
5549  "COPY exited early due to reject records count during multi file "
5550  "processing ";
5551  success_ = false;
5552  }
5553  if (!import_result.load_failed) {
5554  tr += std::string(
5555  "Loaded: " + std::to_string(import_result.rows_completed) +
5556  " recs, Rejected: " + std::to_string(import_result.rows_rejected) +
5557  " recs in " + std::to_string((double)total_time / 1000.0) + " secs");
5558  } else {
5559  tr += std::string("Loader Failed due to : " + import_result.load_msg + " in " +
5560  std::to_string((double)total_time / 1000.0) + " secs");
5561  }
5562  } else {
5563  throw std::runtime_error("Table '" + *table_ + "' must exist before COPY FROM");
5564  }
5565  }
5566  return_message.reset(new std::string(tr));
5567  LOG(INFO) << tr;
5568 }
static std::shared_ptr< WrapperType< MutexType > > getMutex(const LockType lockType, const KeyType &key)
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1555
#define LOG(tag)
Definition: Logger.h:216
std::unique_ptr< std::string > return_message
Definition: ParserNode.h:1531
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1562
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::string to_string(char const *&&v)
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1560
std::string sql_order_by
Definition: CopyParams.h:97
import_export::SourceType source_type
Definition: CopyParams.h:57
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:785
void parse_copy_params(const std::list< std::unique_ptr< NameValueAssign >> &options_, import_export::CopyParams &copy_params, std::vector< std::string > &warnings, std::string &deferred_copy_from_partitions_)
std::string get_session_id() const
Definition: SessionInfo.h:93
Catalog & getCatalog() const
Definition: SessionInfo.h:75
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1561
#define CHECK(condition)
Definition: Logger.h:222
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1557
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:391
std::string userLoggable() const
Definition: SysCatalog.cpp:128
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::get_deferred_copy_from_payload ( std::string &  table,
std::string &  file_name,
import_export::CopyParams copy_params,
std::string &  partitions 
)
inline

Definition at line 1542 of file ParserNode.h.

References deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, table_, and was_deferred_copy_from_.

1545  {
1546  table = *table_;
1547  file_name = deferred_copy_from_file_name_;
1548  copy_params = deferred_copy_from_copy_params_;
1549  partitions = deferred_copy_from_partitions_;
1550  was_deferred_copy_from_ = false;
1551  }
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1562
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1560
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1561
bool Parser::CopyTableStmt::get_success ( ) const
inline

Definition at line 1538 of file ParserNode.h.

References success_.

1538 { return success_; }
std::string& Parser::CopyTableStmt::get_table ( ) const
inline

Definition at line 1533 of file ParserNode.h.

References CHECK, and table_.

1533  {
1534  CHECK(table_);
1535  return *table_;
1536  }
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1554
#define CHECK(condition)
Definition: Logger.h:222
bool Parser::CopyTableStmt::was_deferred_copy_from ( ) const
inline

Definition at line 1540 of file ParserNode.h.

References was_deferred_copy_from_.

1540 { return was_deferred_copy_from_; }

Member Data Documentation

std::unique_ptr<std::string> Parser::CopyTableStmt::copy_from_source_pattern_
private

Definition at line 1555 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

import_export::CopyParams Parser::CopyTableStmt::deferred_copy_from_copy_params_
private

Definition at line 1561 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_file_name_
private

Definition at line 1560 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_partitions_
private

Definition at line 1562 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::list<std::unique_ptr<NameValueAssign> > Parser::CopyTableStmt::options_
private

Definition at line 1557 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

std::unique_ptr<std::string> Parser::CopyTableStmt::return_message

Definition at line 1531 of file ParserNode.h.

Referenced by execute().

bool Parser::CopyTableStmt::success_
private

Definition at line 1556 of file ParserNode.h.

Referenced by execute(), and get_success().

std::unique_ptr<std::string> Parser::CopyTableStmt::table_
private

Definition at line 1554 of file ParserNode.h.

Referenced by CopyTableStmt(), execute(), get_deferred_copy_from_payload(), and get_table().

bool Parser::CopyTableStmt::was_deferred_copy_from_ = false
private

Definition at line 1559 of file ParserNode.h.

Referenced by execute(), get_deferred_copy_from_payload(), and was_deferred_copy_from().


The documentation for this class was generated from the following files: