OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Parser::CopyTableStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::CopyTableStmt:
+ Collaboration diagram for Parser::CopyTableStmt:

Public Member Functions

 CopyTableStmt (std::string *t, std::string *f, std::list< NameValueAssign * > *o)
 
 CopyTableStmt (const rapidjson::Value &payload)
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode, const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &importer_factory)
 
std::string & get_table () const
 
bool get_success () const
 
bool was_deferred_copy_from () const
 
void get_deferred_copy_from_payload (std::string &table, std::string &file_name, import_export::CopyParams &copy_params, std::string &partitions)
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

std::unique_ptr< std::string > return_message
 

Private Attributes

std::unique_ptr< std::string > table_
 
std::unique_ptr< std::string > copy_from_source_pattern_
 
bool success_
 
std::list< std::unique_ptr
< NameValueAssign > > 
options_
 
bool was_deferred_copy_from_ = false
 
std::string deferred_copy_from_file_name_
 
import_export::CopyParams deferred_copy_from_copy_params_
 
std::string deferred_copy_from_partitions_
 

Detailed Description

Definition at line 1473 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::CopyTableStmt::CopyTableStmt ( std::string *  t,
std::string *  f,
std::list< NameValueAssign * > *  o 
)

Definition at line 5394 of file ParserNode.cpp.

References options_.

5397  : table_(t), copy_from_source_pattern_(f), success_(true) {
5398  if (o) {
5399  for (const auto e : *o) {
5400  options_.emplace_back(e);
5401  }
5402  delete o;
5403  }
5404 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1511
constexpr double f
Definition: Utm.h:31
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1510
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1513
Parser::CopyTableStmt::CopyTableStmt ( const rapidjson::Value &  payload)

Definition at line 5406 of file ParserNode.cpp.

References CHECK, copy_from_source_pattern_, json_str(), options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_options(), and table_.

5406  : success_(true) {
5407  CHECK(payload.HasMember("table"));
5408  table_ = std::make_unique<std::string>(json_str(payload["table"]));
5409 
5410  CHECK(payload.HasMember("filePath"));
5411  std::string fs = json_str(payload["filePath"]);
5412  // strip leading/trailing spaces/quotes/single quotes
5413  boost::algorithm::trim_if(fs, boost::is_any_of(" \"'`"));
5414  copy_from_source_pattern_ = std::make_unique<std::string>(fs);
5415 
5416  parse_options(payload, options_);
5417 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1511
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:44
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1510
void parse_options(const rapidjson::Value &payload, std::list< std::unique_ptr< NameValueAssign >> &nameValueList, bool stringToNull=false, bool stringToInteger=false)
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1513

+ Here is the call graph for this function:

Member Function Documentation

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode 
)
overridevirtual

Implements Parser::DDLStmt.

Definition at line 5419 of file ParserNode.cpp.

References import_export::create_importer().

Referenced by heavydb.cursor.Cursor::executemany(), and QueryRunner::QueryRunner::runImport().

5420  {
5421  if (read_only_mode) {
5422  throw std::runtime_error("IMPORT invalid in read only mode.");
5423  }
5424  auto importer_factory = [](Catalog_Namespace::Catalog& catalog,
5425  const TableDescriptor* td,
5426  const std::string& copy_from_source,
5427  const import_export::CopyParams& copy_params)
5428  -> std::unique_ptr<import_export::AbstractImporter> {
5429  return import_export::create_importer(catalog, td, copy_from_source, copy_params);
5430  };
5431  return execute(session, read_only_mode, importer_factory);
5432 }
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string &copy_from_source, const import_export::CopyParams &copy_params)
Definition: Importer.cpp:6302
void execute(const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode,
const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &  importer_factory 
)

Definition at line 5434 of file ParserNode.cpp.

References CHECK, Executor::clearExternalCaches(), copy_from_source_pattern_, deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, logger::ERROR, measure< TimeT >::execution(), legacylockmgr::ExecutorOuterLock, g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_currentUser(), Catalog_Namespace::SessionInfo::get_session_id(), Catalog_Namespace::SessionInfo::getCatalog(), Executor::getExecutor(), legacylockmgr::LockMgr< MutexType, KeyType >::getMutex(), lockmgr::TableLockMgrImpl< T >::getWriteLockForTable(), ddl_utils::IMPORT, logger::INFO, AccessPrivileges::INSERT_INTO_TABLE, import_export::kGeoFile, import_export::kOdbc, import_export::kRasterFile, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, DBObject::loadKey(), LOG, import_export::CopyParams::max_reject, options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_copy_params(), return_message, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, DBObject::setPrivileges(), import_export::CopyParams::source_type, import_export::CopyParams::sql_order_by, import_export::CopyParams::sql_select, run_benchmark_import::start_time, success_, table_, TableDBObjectType, TableDescriptor::tableName, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, Catalog_Namespace::UserMetadata::userLoggable(), ddl_utils::validate_allowed_file_path(), and was_deferred_copy_from_.

Referenced by heavydb.cursor.Cursor::executemany().

5441  {
5442  if (read_only_mode) {
5443  throw std::runtime_error("COPY FROM invalid in read only mode.");
5444  }
5445 
5446  size_t total_time = 0;
5447 
5448  // Prevent simultaneous import / truncate (see TruncateTableStmt::execute)
5449  const auto execute_read_lock =
5453 
5454  const TableDescriptor* td{nullptr};
5455  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5456  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5457 
5458  auto& catalog = session.getCatalog();
5459 
5460  try {
5461  td_with_lock = std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5463  catalog, *table_));
5464  td = (*td_with_lock)();
5465  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5467  } catch (const std::runtime_error& e) {
5468  // noop
5469  // TODO(adb): We're really only interested in whether the table exists or not.
5470  // Create a more refined exception.
5471  }
5472 
5473  // if the table already exists, it's locked, so check access privileges
5474  if (td) {
5475  std::vector<DBObject> privObjects;
5476  DBObject dbObject(*table_, TableDBObjectType);
5477  dbObject.loadKey(catalog);
5478  dbObject.setPrivileges(AccessPrivileges::INSERT_INTO_TABLE);
5479  privObjects.push_back(dbObject);
5480  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
5481  throw std::runtime_error("Violation of access privileges: user " +
5482  session.get_currentUser().userLoggable() +
5483  " has no insert privileges for table " + *table_ + ".");
5484  }
5485 
5486  // invalidate cached item
5487  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
5488  }
5489 
5490  import_export::CopyParams copy_params;
5491  std::vector<std::string> warnings;
5493 
5494  boost::regex non_local_file_regex{R"(^\s*(s3|http|https)://.+)",
5495  boost::regex::extended | boost::regex::icase};
5496  if (!boost::regex_match(*copy_from_source_pattern_, non_local_file_regex) &&
5500  }
5501  // since we'll have not only posix file names but also s3/hdfs/... url
5502  // we do not expand wildcard or check file existence here.
5503  // from here on, copy_from_source contains something which may be a url
5504  // a wildcard of file names, or a sql select statement;
5505  std::string copy_from_source = *copy_from_source_pattern_;
5506 
5507  if (copy_params.source_type == import_export::SourceType::kOdbc) {
5508  copy_params.sql_select = copy_from_source;
5509  if (copy_params.sql_order_by.empty()) {
5510  throw std::runtime_error(
5511  "Option \"SQL ORDER BY\" must be specified when copying from an ODBC source.");
5512  }
5513  }
5514 
5515  std::string tr;
5516 
5517  for (auto const& warning : warnings) {
5518  tr += warning + "\n";
5519  }
5520 
5521  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
5523  // geo import
5524  // we do nothing here, except stash the parameters so we can
5525  // do the import when we unwind to the top of the handler
5526  deferred_copy_from_file_name_ = copy_from_source;
5527  deferred_copy_from_copy_params_ = copy_params;
5528  was_deferred_copy_from_ = true;
5529 
5530  // the result string
5531  // @TODO simon.eves put something more useful in here
5532  // except we really can't because we haven't done the import yet!
5533  if (td) {
5534  tr += std::string("Appending geo to table '") + *table_ + std::string("'...");
5535  } else {
5536  tr += std::string("Creating table '") + *table_ +
5537  std::string("' and importing geo...");
5538  }
5539  } else {
5540  if (td) {
5541  CHECK(td_with_lock);
5542 
5543  // regular import
5544  auto importer = importer_factory(catalog, td, copy_from_source, copy_params);
5545  auto start_time = ::toString(std::chrono::system_clock::now());
5547  auto query_session = session.get_session_id();
5548  auto query_str = "COPYING " + td->tableName;
5550  executor->enrollQuerySession(query_session,
5551  query_str,
5552  start_time,
5554  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5555  }
5556 
5557  ScopeGuard clearInterruptStatus =
5558  [executor, &query_str, &query_session, &start_time, &importer] {
5559  // reset the runtime query interrupt status
5561  executor->clearQuerySessionStatus(query_session, start_time);
5562  }
5563  };
5564  import_export::ImportStatus import_result;
5565  auto ms =
5566  measure<>::execution([&]() { import_result = importer->import(&session); });
5567  total_time += ms;
5568  // results
5569  if (!import_result.load_failed &&
5570  import_result.rows_rejected > copy_params.max_reject) {
5571  LOG(ERROR) << "COPY exited early due to reject records count during multi file "
5572  "processing ";
5573  // if we have crossed the truncated load threshold
5574  import_result.load_failed = true;
5575  import_result.load_msg =
5576  "COPY exited early due to reject records count during multi file "
5577  "processing ";
5578  success_ = false;
5579  }
5580  if (!import_result.load_failed) {
5581  tr += std::string(
5582  "Loaded: " + std::to_string(import_result.rows_completed) +
5583  " recs, Rejected: " + std::to_string(import_result.rows_rejected) +
5584  " recs in " + std::to_string((double)total_time / 1000.0) + " secs");
5585  } else {
5586  tr += std::string("Loader Failed due to : " + import_result.load_msg + " in " +
5587  std::to_string((double)total_time / 1000.0) + " secs");
5588  }
5589  } else {
5590  throw std::runtime_error("Table '" + *table_ + "' must exist before COPY FROM");
5591  }
5592  }
5593  return_message.reset(new std::string(tr));
5594  LOG(INFO) << tr;
5595 }
static std::shared_ptr< WrapperType< MutexType > > getMutex(const LockType lockType, const KeyType &key)
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1511
#define LOG(tag)
Definition: Logger.h:285
std::unique_ptr< std::string > return_message
Definition: ParserNode.h:1487
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1518
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::string to_string(char const *&&v)
std::shared_lock< T > shared_lock
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:475
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1516
std::string sql_order_by
Definition: CopyParams.h:97
import_export::SourceType source_type
Definition: CopyParams.h:57
std::string toString(const ExecutorDeviceType &device_type)
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1510
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:785
void parse_copy_params(const std::list< std::unique_ptr< NameValueAssign >> &options_, import_export::CopyParams &copy_params, std::vector< std::string > &warnings, std::string &deferred_copy_from_partitions_)
std::string get_session_id() const
Definition: SessionInfo.h:93
Catalog & getCatalog() const
Definition: SessionInfo.h:75
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1517
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1513
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:388
std::string userLoggable() const
Definition: SysCatalog.cpp:158
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:373

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::get_deferred_copy_from_payload ( std::string &  table,
std::string &  file_name,
import_export::CopyParams copy_params,
std::string &  partitions 
)
inline

Definition at line 1498 of file ParserNode.h.

References deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, table_, and was_deferred_copy_from_.

1501  {
1502  table = *table_;
1503  file_name = deferred_copy_from_file_name_;
1504  copy_params = deferred_copy_from_copy_params_;
1505  partitions = deferred_copy_from_partitions_;
1506  was_deferred_copy_from_ = false;
1507  }
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1518
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1516
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1510
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1517
bool Parser::CopyTableStmt::get_success ( ) const
inline

Definition at line 1494 of file ParserNode.h.

References success_.

1494 { return success_; }
std::string& Parser::CopyTableStmt::get_table ( ) const
inline

Definition at line 1489 of file ParserNode.h.

References CHECK, and table_.

1489  {
1490  CHECK(table_);
1491  return *table_;
1492  }
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1510
#define CHECK(condition)
Definition: Logger.h:291
bool Parser::CopyTableStmt::was_deferred_copy_from ( ) const
inline

Definition at line 1496 of file ParserNode.h.

References was_deferred_copy_from_.

1496 { return was_deferred_copy_from_; }

Member Data Documentation

std::unique_ptr<std::string> Parser::CopyTableStmt::copy_from_source_pattern_
private

Definition at line 1511 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

import_export::CopyParams Parser::CopyTableStmt::deferred_copy_from_copy_params_
private

Definition at line 1517 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_file_name_
private

Definition at line 1516 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_partitions_
private

Definition at line 1518 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::list<std::unique_ptr<NameValueAssign> > Parser::CopyTableStmt::options_
private

Definition at line 1513 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

std::unique_ptr<std::string> Parser::CopyTableStmt::return_message

Definition at line 1487 of file ParserNode.h.

Referenced by execute().

bool Parser::CopyTableStmt::success_
private

Definition at line 1512 of file ParserNode.h.

Referenced by execute(), and get_success().

std::unique_ptr<std::string> Parser::CopyTableStmt::table_
private

Definition at line 1510 of file ParserNode.h.

Referenced by CopyTableStmt(), execute(), get_deferred_copy_from_payload(), and get_table().

bool Parser::CopyTableStmt::was_deferred_copy_from_ = false
private

Definition at line 1515 of file ParserNode.h.

Referenced by execute(), get_deferred_copy_from_payload(), and was_deferred_copy_from().


The documentation for this class was generated from the following files: