OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Parser::CopyTableStmt Class Reference

#include <ParserNode.h>

+ Inheritance diagram for Parser::CopyTableStmt:
+ Collaboration diagram for Parser::CopyTableStmt:

Public Member Functions

 CopyTableStmt (std::string *t, std::string *f, std::list< NameValueAssign * > *o)
 
 CopyTableStmt (const rapidjson::Value &payload)
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override
 
void execute (const Catalog_Namespace::SessionInfo &session, bool read_only_mode, const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &importer_factory)
 
std::string & get_table () const
 
bool get_success () const
 
bool was_deferred_copy_from () const
 
void get_deferred_copy_from_payload (std::string &table, std::string &file_name, import_export::CopyParams &copy_params, std::string &partitions)
 
- Public Member Functions inherited from Parser::DDLStmt
void setColumnDescriptor (ColumnDescriptor &cd, const ColumnDef *coldef)
 
- Public Member Functions inherited from Parser::Node
virtual ~Node ()
 

Public Attributes

std::unique_ptr< std::string > return_message
 

Private Attributes

std::unique_ptr< std::string > table_
 
std::unique_ptr< std::string > copy_from_source_pattern_
 
bool success_
 
std::list< std::unique_ptr
< NameValueAssign > > 
options_
 
bool was_deferred_copy_from_ = false
 
std::string deferred_copy_from_file_name_
 
import_export::CopyParams deferred_copy_from_copy_params_
 
std::string deferred_copy_from_partitions_
 

Detailed Description

Definition at line 1460 of file ParserNode.h.

Constructor & Destructor Documentation

Parser::CopyTableStmt::CopyTableStmt ( std::string *  t,
std::string *  f,
std::list< NameValueAssign * > *  o 
)

Definition at line 5699 of file ParserNode.cpp.

References options_.

5702  : table_(t), copy_from_source_pattern_(f), success_(true) {
5703  if (o) {
5704  for (const auto e : *o) {
5705  options_.emplace_back(e);
5706  }
5707  delete o;
5708  }
5709 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1498
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1500
Parser::CopyTableStmt::CopyTableStmt ( const rapidjson::Value &  payload)

Definition at line 5711 of file ParserNode.cpp.

References CHECK, copy_from_source_pattern_, json_str(), options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_options(), and table_.

5711  : success_(true) {
5712  CHECK(payload.HasMember("table"));
5713  table_ = std::make_unique<std::string>(json_str(payload["table"]));
5714 
5715  CHECK(payload.HasMember("filePath"));
5716  std::string fs = json_str(payload["filePath"]);
5717  // strip leading/trailing spaces/quotes/single quotes
5718  boost::algorithm::trim_if(fs, boost::is_any_of(" \"'`"));
5719  copy_from_source_pattern_ = std::make_unique<std::string>(fs);
5720 
5721  parse_options(payload, options_);
5722 }
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1498
const std::string json_str(const rapidjson::Value &obj) noexcept
Definition: JsonAccessors.h:46
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
void parse_options(const rapidjson::Value &payload, std::list< std::unique_ptr< NameValueAssign >> &nameValueList, bool stringToNull=false, bool stringToInteger=false)
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1500

+ Here is the call graph for this function:

Member Function Documentation

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode 
)
overridevirtual

Implements Parser::DDLStmt.

Definition at line 5724 of file ParserNode.cpp.

References import_export::create_importer().

Referenced by heavydb.cursor.Cursor::executemany(), and QueryRunner::QueryRunner::runImport().

5725  {
5726  if (read_only_mode) {
5727  throw std::runtime_error("IMPORT invalid in read only mode.");
5728  }
5729  auto importer_factory = [](Catalog_Namespace::Catalog& catalog,
5730  const TableDescriptor* td,
5731  const std::string& copy_from_source,
5732  const import_export::CopyParams& copy_params)
5733  -> std::unique_ptr<import_export::AbstractImporter> {
5734  return import_export::create_importer(catalog, td, copy_from_source, copy_params);
5735  };
5736  return execute(session, read_only_mode, importer_factory);
5737 }
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
std::unique_ptr< AbstractImporter > create_importer(Catalog_Namespace::Catalog &catalog, const TableDescriptor *td, const std::string &copy_from_source, const import_export::CopyParams &copy_params)
Definition: Importer.cpp:6211
void execute(const Catalog_Namespace::SessionInfo &session, bool read_only_mode) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::execute ( const Catalog_Namespace::SessionInfo session,
bool  read_only_mode,
const std::function< std::unique_ptr< import_export::AbstractImporter >(Catalog_Namespace::Catalog &, const TableDescriptor *, const std::string &, const import_export::CopyParams &)> &  importer_factory 
)

Definition at line 5739 of file ParserNode.cpp.

References CHECK, Executor::clearExternalCaches(), copy_from_source_pattern_, deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, logger::ERROR, measure< TimeT >::execution(), g_enable_non_kernel_time_query_interrupt, Catalog_Namespace::SessionInfo::get_currentUser(), Catalog_Namespace::SessionInfo::get_session_id(), Catalog_Namespace::SessionInfo::getCatalog(), legacylockmgr::getExecuteReadLock(), Executor::getExecutor(), lockmgr::TableLockMgrImpl< T >::getWriteLockForTable(), ddl_utils::IMPORT, logger::INFO, AccessPrivileges::INSERT_INTO_TABLE, import_export::kGeoFile, import_export::kOdbc, import_export::kRasterFile, import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, DBObject::loadKey(), LOG, import_export::CopyParams::max_reject, options_, Parser::anonymous_namespace{ParserNode.cpp}::parse_copy_params(), return_message, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, DBObject::setPrivileges(), import_export::CopyParams::source_type, import_export::CopyParams::sql_order_by, import_export::CopyParams::sql_select, run_benchmark_import::start_time, success_, table_, TableDBObjectType, TableDescriptor::tableName, to_string(), toString(), Executor::UNITARY_EXECUTOR_ID, Catalog_Namespace::UserMetadata::userLoggable(), ddl_utils::validate_allowed_file_path(), and was_deferred_copy_from_.

Referenced by heavydb.cursor.Cursor::executemany().

5746  {
5747  if (read_only_mode) {
5748  throw std::runtime_error("COPY FROM invalid in read only mode.");
5749  }
5750 
5751  size_t total_time = 0;
5752 
5753  // Prevent simultaneous import / truncate (see TruncateTableStmt::execute)
5754  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
5755 
5756  const TableDescriptor* td{nullptr};
5757  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5758  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5759 
5760  auto& catalog = session.getCatalog();
5761 
5762  try {
5763  td_with_lock = std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5765  catalog, *table_));
5766  td = (*td_with_lock)();
5767  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5769  } catch (const std::runtime_error& e) {
5770  // noop
5771  // TODO(adb): We're really only interested in whether the table exists or not.
5772  // Create a more refined exception.
5773  }
5774 
5775  // if the table already exists, it's locked, so check access privileges
5776  if (td) {
5777  std::vector<DBObject> privObjects;
5778  DBObject dbObject(*table_, TableDBObjectType);
5779  dbObject.loadKey(catalog);
5780  dbObject.setPrivileges(AccessPrivileges::INSERT_INTO_TABLE);
5781  privObjects.push_back(dbObject);
5782  if (!SysCatalog::instance().checkPrivileges(session.get_currentUser(), privObjects)) {
5783  throw std::runtime_error("Violation of access privileges: user " +
5784  session.get_currentUser().userLoggable() +
5785  " has no insert privileges for table " + *table_ + ".");
5786  }
5787 
5788  // invalidate cached item
5789  Executor::clearExternalCaches(true, td, catalog.getCurrentDB().dbId);
5790  }
5791 
5792  import_export::CopyParams copy_params;
5793  std::vector<std::string> warnings;
5795 
5796  boost::regex non_local_file_regex{R"(^\s*(s3|http|https)://.+)",
5797  boost::regex::extended | boost::regex::icase};
5798  if (!boost::regex_match(*copy_from_source_pattern_, non_local_file_regex) &&
5802  }
5803  // since we'll have not only posix file names but also s3/hdfs/... url
5804  // we do not expand wildcard or check file existence here.
5805  // from here on, copy_from_source contains something which may be a url
5806  // a wildcard of file names, or a sql select statement;
5807  std::string copy_from_source = *copy_from_source_pattern_;
5808 
5809  if (copy_params.source_type == import_export::SourceType::kOdbc) {
5810  copy_params.sql_select = copy_from_source;
5811  if (copy_params.sql_order_by.empty()) {
5812  throw std::runtime_error(
5813  "Option \"SQL ORDER BY\" must be specified when copying from an ODBC source.");
5814  }
5815  }
5816 
5817  std::string tr;
5818 
5819  for (auto const& warning : warnings) {
5820  tr += warning + "\n";
5821  }
5822 
5823  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
5825  // geo import
5826  // we do nothing here, except stash the parameters so we can
5827  // do the import when we unwind to the top of the handler
5828  deferred_copy_from_file_name_ = copy_from_source;
5829  deferred_copy_from_copy_params_ = copy_params;
5830  was_deferred_copy_from_ = true;
5831 
5832  // the result string
5833  // @TODO simon.eves put something more useful in here
5834  // except we really can't because we haven't done the import yet!
5835  if (td) {
5836  tr += std::string("Appending geo to table '") + *table_ + std::string("'...");
5837  } else {
5838  tr += std::string("Creating table '") + *table_ +
5839  std::string("' and importing geo...");
5840  }
5841  } else {
5842  if (td) {
5843  CHECK(td_with_lock);
5844 
5845  // regular import
5846  auto importer = importer_factory(catalog, td, copy_from_source, copy_params);
5847  auto start_time = ::toString(std::chrono::system_clock::now());
5849  auto query_session = session.get_session_id();
5850  auto query_str = "COPYING " + td->tableName;
5852  executor->enrollQuerySession(query_session,
5853  query_str,
5854  start_time,
5856  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5857  }
5858 
5859  ScopeGuard clearInterruptStatus =
5860  [executor, &query_str, &query_session, &start_time, &importer] {
5861  // reset the runtime query interrupt status
5863  executor->clearQuerySessionStatus(query_session, start_time);
5864  }
5865  };
5866  import_export::ImportStatus import_result;
5867  auto ms =
5868  measure<>::execution([&]() { import_result = importer->import(&session); });
5869  total_time += ms;
5870  // results
5871  if (!import_result.load_failed &&
5872  import_result.rows_rejected > copy_params.max_reject) {
5873  LOG(ERROR) << "COPY exited early due to reject records count during multi file "
5874  "processing ";
5875  // if we have crossed the truncated load threshold
5876  import_result.load_failed = true;
5877  import_result.load_msg =
5878  "COPY exited early due to reject records count during multi file "
5879  "processing ";
5880  success_ = false;
5881  }
5882  if (!import_result.load_failed) {
5883  tr += std::string(
5884  "Loaded: " + std::to_string(import_result.rows_completed) +
5885  " recs, Rejected: " + std::to_string(import_result.rows_rejected) +
5886  " recs in " + std::to_string((double)total_time / 1000.0) + " secs");
5887  } else {
5888  tr += std::string("Loader Failed due to : " + import_result.load_msg + " in " +
5889  std::to_string((double)total_time / 1000.0) + " secs");
5890  }
5891  } else {
5892  throw std::runtime_error("Table '" + *table_ + "' must exist before COPY FROM");
5893  }
5894  }
5895  return_message.reset(new std::string(tr));
5896  LOG(INFO) << tr;
5897 }
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::unique_ptr< std::string > copy_from_source_pattern_
Definition: ParserNode.h:1498
auto getExecuteReadLock()
#define LOG(tag)
Definition: Logger.h:285
std::unique_ptr< std::string > return_message
Definition: ParserNode.h:1474
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1505
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:134
std::string to_string(char const *&&v)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:509
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1503
std::string sql_order_by
Definition: CopyParams.h:97
import_export::SourceType source_type
Definition: CopyParams.h:57
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:822
void parse_copy_params(const std::list< std::unique_ptr< NameValueAssign >> &options_, import_export::CopyParams &copy_params, std::vector< std::string > &warnings, std::string &deferred_copy_from_partitions_)
std::string get_session_id() const
Definition: SessionInfo.h:93
Catalog & getCatalog() const
Definition: SessionInfo.h:75
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1504
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< NameValueAssign > > options_
Definition: ParserNode.h:1500
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
static constexpr ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:423
std::string userLoggable() const
Definition: SysCatalog.cpp:158
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Parser::CopyTableStmt::get_deferred_copy_from_payload ( std::string &  table,
std::string &  file_name,
import_export::CopyParams copy_params,
std::string &  partitions 
)
inline

Definition at line 1485 of file ParserNode.h.

References deferred_copy_from_copy_params_, deferred_copy_from_file_name_, deferred_copy_from_partitions_, table_, and was_deferred_copy_from_.

1488  {
1489  table = *table_;
1490  file_name = deferred_copy_from_file_name_;
1491  copy_params = deferred_copy_from_copy_params_;
1492  partitions = deferred_copy_from_partitions_;
1493  was_deferred_copy_from_ = false;
1494  }
std::string deferred_copy_from_partitions_
Definition: ParserNode.h:1505
std::string deferred_copy_from_file_name_
Definition: ParserNode.h:1503
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
import_export::CopyParams deferred_copy_from_copy_params_
Definition: ParserNode.h:1504
bool Parser::CopyTableStmt::get_success ( ) const
inline

Definition at line 1481 of file ParserNode.h.

References success_.

1481 { return success_; }
std::string& Parser::CopyTableStmt::get_table ( ) const
inline

Definition at line 1476 of file ParserNode.h.

References CHECK, and table_.

1476  {
1477  CHECK(table_);
1478  return *table_;
1479  }
std::unique_ptr< std::string > table_
Definition: ParserNode.h:1497
#define CHECK(condition)
Definition: Logger.h:291
bool Parser::CopyTableStmt::was_deferred_copy_from ( ) const
inline

Definition at line 1483 of file ParserNode.h.

References was_deferred_copy_from_.

1483 { return was_deferred_copy_from_; }

Member Data Documentation

std::unique_ptr<std::string> Parser::CopyTableStmt::copy_from_source_pattern_
private

Definition at line 1498 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

import_export::CopyParams Parser::CopyTableStmt::deferred_copy_from_copy_params_
private

Definition at line 1504 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_file_name_
private

Definition at line 1503 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::string Parser::CopyTableStmt::deferred_copy_from_partitions_
private

Definition at line 1505 of file ParserNode.h.

Referenced by execute(), and get_deferred_copy_from_payload().

std::list<std::unique_ptr<NameValueAssign> > Parser::CopyTableStmt::options_
private

Definition at line 1500 of file ParserNode.h.

Referenced by CopyTableStmt(), and execute().

std::unique_ptr<std::string> Parser::CopyTableStmt::return_message

Definition at line 1474 of file ParserNode.h.

Referenced by execute().

bool Parser::CopyTableStmt::success_
private

Definition at line 1499 of file ParserNode.h.

Referenced by execute(), and get_success().

std::unique_ptr<std::string> Parser::CopyTableStmt::table_
private

Definition at line 1497 of file ParserNode.h.

Referenced by CopyTableStmt(), execute(), get_deferred_copy_from_payload(), and get_table().

bool Parser::CopyTableStmt::was_deferred_copy_from_ = false
private

Definition at line 1502 of file ParserNode.h.

Referenced by execute(), get_deferred_copy_from_payload(), and was_deferred_copy_from().


The documentation for this class was generated from the following files: