OmniSciDB  04ee39c94c
Importer_NS::DataStreamSink Class Referenceabstract

#include <Importer.h>

+ Inheritance diagram for Importer_NS::DataStreamSink:
+ Collaboration diagram for Importer_NS::DataStreamSink:

Public Member Functions

 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
virtual ImportStatus importDelimited (const std::string &file_path, const bool decompressed)=0
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths)
 

Protected Member Functions

ImportStatus archivePlumber ()
 

Protected Attributes

CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status
 
bool load_failed = false
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 734 of file Importer.h.

Constructor & Destructor Documentation

◆ DataStreamSink() [1/2]

Importer_NS::DataStreamSink::DataStreamSink ( )
inline

Definition at line 736 of file Importer.h.

736 {}

◆ DataStreamSink() [2/2]

Importer_NS::DataStreamSink::DataStreamSink ( const CopyParams copy_params,
const std::string  file_path 
)
inline

Definition at line 737 of file Importer.h.

738  : copy_params(copy_params), file_path(file_path) {}
const std::string file_path
Definition: Importer.h:753

◆ ~DataStreamSink()

virtual Importer_NS::DataStreamSink::~DataStreamSink ( )
inlinevirtual

Definition at line 739 of file Importer.h.

739 {}

Member Function Documentation

◆ archivePlumber()

ImportStatus Importer_NS::DataStreamSink::archivePlumber ( )
protected

Definition at line 3058 of file Importer.cpp.

References arrow_throw_if(), Importer_NS::Importer::buffer, anonymous_namespace{ExecuteTest.cpp}::c(), copy_params, cpu_threads(), Importer_NS::TypedImportBuffer::del_values(), Importer_NS::CopyParams::delimiter, logger::ERROR, Importer_NS::CopyParams::escape, measure< TimeT >::execution(), Importer_NS::BadRowsTracker::file_name, file_offsets, file_offsets_mutex, file_path, Importer_NS::CopyParams::file_type, ThreadController_NS::SimpleThreadController< FutureReturnType >::finish(), TestHelpers::g(), Importer_NS::Importer::get_column_descs(), get_filesize(), Importer_NS::Importer::getLoader(), Importer_NS::HAS_HEADER, Importer_NS::CopyParams::has_header, Importer_NS::Importer::import_buffers_vec, import_compressed(), import_status, Importer_NS::BadRowsTracker::importer, logger::INFO, kARRAY, kBIGINT, kBOOLEAN, kCHAR, kDATE, kDECIMAL, kDOUBLE, kFLOAT, kINT, kLINESTRING, kMULTIPOLYGON, kNUMERIC, kPOINT, kPOLYGON, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, Importer_NS::CopyParams::line_delim, Importer_NS::Importer::load(), load_failed, Importer_NS::ImportStatus::load_truncated, Importer_NS::Importer::loader, LOG, mapd_glob(), Importer_NS::CopyParams::max_reject, Importer_NS::Importer::max_threads, num_rows, Archive::parse_url(), Importer_NS::CopyParams::plain_text, Importer_NS::CopyParams::quote, Importer_NS::CopyParams::quoted, Importer_NS::BadRowsTracker::row_group, Importer_NS::BadRowsTracker::rows, Importer_NS::ImportStatus::rows_completed, Importer_NS::ImportStatus::rows_estimated, Importer_NS::ImportStatus::rows_rejected, Importer_NS::CopyParams::s3_access_key, Importer_NS::CopyParams::s3_endpoint, Importer_NS::CopyParams::s3_region, Importer_NS::CopyParams::s3_secret_key, ThreadController_NS::SimpleThreadController< FutureReturnType >::startThread(), Importer_NS::CopyParams::threads, to_string(), total_file_size, run-benchmark-import::type, anonymous_namespace{ArrowImporter.h}::value_getter(), and VLOG.

Referenced by Importer_NS::Importer::import(), and Importer_NS::Detector::read_file().

3058  {
3059  // in generalized importing scheme, reaching here file_path may
3060  // contain a file path, a url or a wildcard of file paths.
3061  // see CopyTableStmt::execute.
3062  auto file_paths = mapd_glob(file_path);
3063  if (file_paths.size() == 0) {
3064  file_paths.push_back(file_path);
3065  }
3066 
3067  // sum up sizes of all local files -- only for local files. if
3068  // file_path is a s3 url, sizes will be obtained via S3Archive.
3069  for (const auto& file_path : file_paths) {
3071  }
3072 
3073 #ifdef ENABLE_IMPORT_PARQUET
3074  // s3 parquet goes different route because the files do not use libarchive
3075  // but parquet api, and they need to landed like .7z files.
3076  //
3077  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3078  // because for example spark sql users may specify a output url w/o file
3079  // extension like this:
3080  // df.write
3081  // .mode("overwrite")
3082  // .parquet("s3://bucket/folder/parquet/mydata")
3083  // without the parameter, it means plain or compressed csv files.
3084  // note: .ORC and AVRO files should follow a similar path to Parquet?
3085  if (copy_params.file_type == FileType::PARQUET) {
3086  import_parquet(file_paths);
3087  } else
3088 #endif
3089  {
3090  import_compressed(file_paths);
3091  }
3092  return import_status;
3093 }
void import_compressed(std::vector< std::string > &file_paths)
Definition: Importer.cpp:3454
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:74
std::vector< std::string > mapd_glob(const std::string &pattern)
Definition: mapd_glob.cpp:22
ImportStatus import_status
Definition: Importer.h:755
const std::string file_path
Definition: Importer.h:753
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ get_copy_params()

const CopyParams& Importer_NS::DataStreamSink::get_copy_params ( ) const
inline

Definition at line 746 of file Importer.h.

Referenced by MapDHandler::detect_column_types().

746 { return copy_params; }
+ Here is the caller graph for this function:

◆ import_compressed()

void Importer_NS::DataStreamSink::import_compressed ( std::vector< std::string > &  file_paths)

Definition at line 3454 of file Importer.cpp.

References File_Namespace::close(), copy_params, file_offsets, file_offsets_mutex, Importer_NS::CopyParams::has_header, import_status, Importer_NS::Importer::importDelimited(), Importer_NS::CopyParams::line_delim, Importer_NS::ImportStatus::load_truncated, LOG, Importer_NS::NO_HEADER, p_file, Archive::parse_url(), Importer_NS::CopyParams::plain_text, Importer_NS::ImportStatus::rows_completed, Importer_NS::CopyParams::s3_access_key, Importer_NS::CopyParams::s3_endpoint, Importer_NS::CopyParams::s3_region, Importer_NS::CopyParams::s3_secret_key, total_file_size, logger::WARNING, and File_Namespace::write().

Referenced by archivePlumber().

3454  {
3455  // a new requirement is to have one single input stream into
3456  // Importer::importDelimited, so need to move pipe related
3457  // stuff to the outmost block.
3458  int fd[2];
3459  if (pipe(fd) < 0) {
3460  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3461  }
3462  signal(SIGPIPE, SIG_IGN);
3463 
3464  std::exception_ptr teptr;
3465  // create a thread to read uncompressed byte stream out of pipe and
3466  // feed into importDelimited()
3467  ImportStatus ret;
3468  auto th_pipe_reader = std::thread([&]() {
3469  try {
3470  // importDelimited will read from FILE* p_file
3471  if (0 == (p_file = fdopen(fd[0], "r"))) {
3472  throw std::runtime_error(std::string("failed to open a pipe: ") +
3473  strerror(errno));
3474  }
3475 
3476  // in future, depending on data types of this uncompressed stream
3477  // it can be feed into other function such like importParquet, etc
3478  ret = importDelimited(file_path, true);
3479  } catch (...) {
3480  if (!teptr) { // no replace
3481  teptr = std::current_exception();
3482  }
3483  }
3484 
3485  if (p_file) {
3486  fclose(p_file);
3487  }
3488  p_file = 0;
3489  });
3490 
3491  // create a thread to iterate all files (in all archives) and
3492  // forward the uncompressed byte stream to fd[1] which is
3493  // then feed into importDelimited, importParquet, and etc.
3494  auto th_pipe_writer = std::thread([&]() {
3495  std::unique_ptr<S3Archive> us3arch;
3496  bool stop = false;
3497  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3498  try {
3499  auto file_path = file_paths[fi];
3500  std::unique_ptr<Archive> uarch;
3501  std::map<int, std::string> url_parts;
3502  Archive::parse_url(file_path, url_parts);
3503  const std::string S3_objkey_url_scheme = "s3ok";
3504  if ("file" == url_parts[2] || "" == url_parts[2]) {
3505  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3506  } else if ("s3" == url_parts[2]) {
3507 #ifdef HAVE_AWS_S3
3508  // new a S3Archive with a shared s3client.
3509  // should be safe b/c no wildcard with s3 url
3510  us3arch.reset(new S3Archive(file_path,
3516  us3arch->init_for_read();
3517  total_file_size += us3arch->get_total_file_size();
3518  // not land all files here but one by one in following iterations
3519  for (const auto& objkey : us3arch->get_objkeys()) {
3520  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3521  }
3522  continue;
3523 #else
3524  throw std::runtime_error("AWS S3 support not available");
3525 #endif // HAVE_AWS_S3
3526  } else if (S3_objkey_url_scheme == url_parts[2]) {
3527 #ifdef HAVE_AWS_S3
3528  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3529  auto file_path =
3530  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3531  if (0 == file_path.size()) {
3532  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3533  }
3534  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3535  // file not removed until file closed
3536  us3arch->vacuum(objkey);
3537 #else
3538  throw std::runtime_error("AWS S3 support not available");
3539 #endif // HAVE_AWS_S3
3540  }
3541 #if 0 // TODO(ppan): implement and enable any other archive class
3542  else
3543  if ("hdfs" == url_parts[2])
3544  uarch.reset(new HdfsArchive(file_path));
3545 #endif
3546  else {
3547  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3548  }
3549 
3550  // init the archive for read
3551  auto& arch = *uarch;
3552 
3553  // coming here, the archive of url should be ready to be read, unarchived
3554  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3555  const void* buf;
3556  size_t size;
3557  bool just_saw_archive_header;
3558  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3559  bool first_text_header_skipped = false;
3560  // start reading uncompressed bytes of this archive from libarchive
3561  // note! this archive may contain more than one files!
3562  file_offsets.push_back(0);
3563  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3564  bool insert_line_delim_after_this_file = false;
3565  while (!stop) {
3566  int64_t offset{-1};
3567  auto ok = arch.read_data_block(&buf, &size, &offset);
3568  // can't use (uncompressed) size, so track (max) file offset.
3569  // also we want to capture offset even on e.o.f.
3570  if (offset > 0) {
3571  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3572  file_offsets.back() = offset;
3573  }
3574  if (!ok) {
3575  break;
3576  }
3577  // one subtle point here is now we concatenate all files
3578  // to a single FILE stream with which we call importDelimited
3579  // only once. this would make it misunderstand that only one
3580  // header line is with this 'single' stream, while actually
3581  // we may have one header line for each of the files.
3582  // so we need to skip header lines here instead in importDelimited.
3583  const char* buf2 = (const char*)buf;
3584  int size2 = size;
3586  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3587  while (size2-- > 0) {
3588  if (*buf2++ == copy_params.line_delim) {
3589  break;
3590  }
3591  }
3592  if (size2 <= 0) {
3593  LOG(WARNING) << "No line delimiter in block." << std::endl;
3594  }
3595  just_saw_archive_header = false;
3596  first_text_header_skipped = true;
3597  }
3598  // In very rare occasions the write pipe somehow operates in a mode similar to
3599  // non-blocking while pipe(fds) should behave like pipe2(fds, 0) which means
3600  // blocking mode. On such a unreliable blocking mode, a possible fix is to
3601  // loop reading till no bytes left, otherwise the annoying `failed to write
3602  // pipe: Success`...
3603  if (size2 > 0) {
3604  int nremaining = size2;
3605  while (nremaining > 0) {
3606  // try to write the entire remainder of the buffer to the pipe
3607  int nwritten = write(fd[1], buf2, nremaining);
3608  // how did we do?
3609  if (nwritten < 0) {
3610  // something bad happened
3611  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3612  // ignore these, assume nothing written, try again
3613  nwritten = 0;
3614  } else {
3615  // a real error
3616  throw std::runtime_error(
3617  std::string("failed or interrupted write to pipe: ") +
3618  strerror(errno));
3619  }
3620  } else if (nwritten == nremaining) {
3621  // we wrote everything; we're done
3622  break;
3623  }
3624  // only wrote some (or nothing), try again
3625  nremaining -= nwritten;
3626  buf2 += nwritten;
3627  // no exception when too many rejected
3628  // @simon.eves how would this get set? from the other thread? mutex
3629  // needed?
3631  stop = true;
3632  break;
3633  }
3634  }
3635  // check that this file (buf for size) ended with a line delim
3636  if (size > 0) {
3637  const char* plast = static_cast<const char*>(buf) + (size - 1);
3638  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
3639  }
3640  }
3641  }
3642  // if that file didn't end with a line delim, we insert one here to terminate
3643  // that file's stream use a loop for the same reason as above
3644  if (insert_line_delim_after_this_file) {
3645  while (true) {
3646  // write the delim char to the pipe
3647  int nwritten = write(fd[1], &copy_params.line_delim, 1);
3648  // how did we do?
3649  if (nwritten < 0) {
3650  // something bad happened
3651  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
3652  // ignore these, assume nothing written, try again
3653  nwritten = 0;
3654  } else {
3655  // a real error
3656  throw std::runtime_error(
3657  std::string("failed or interrupted write to pipe: ") +
3658  strerror(errno));
3659  }
3660  } else if (nwritten == 1) {
3661  // we wrote it; we're done
3662  break;
3663  }
3664  }
3665  }
3666  }
3667  } catch (...) {
3668  // when import is aborted because too many data errors or because end of a
3669  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
3670  // suppressed.
3671  mapd_shared_lock<mapd_shared_mutex> read_lock(status_mutex);
3673  break;
3674  }
3675  if (import_status.rows_completed > 0) {
3676  if (nullptr != dynamic_cast<Detector*>(this)) {
3677  break;
3678  }
3679  }
3680  if (!teptr) { // no replace
3681  teptr = std::current_exception();
3682  }
3683  break;
3684  }
3685  }
3686  // close writer end
3687  close(fd[1]);
3688  });
3689 
3690  th_pipe_reader.join();
3691  th_pipe_writer.join();
3692 
3693  // rethrow any exception happened herebefore
3694  if (teptr) {
3695  std::rethrow_exception(teptr);
3696  }
3697 }
std::vector< size_t > file_offsets
Definition: Importer.h:758
#define LOG(tag)
Definition: Logger.h:182
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed)=0
static mapd_shared_mutex status_mutex
Definition: Importer.cpp:143
std::string s3_access_key
Definition: Importer.h:115
ImportHeaderRow has_header
Definition: Importer.h:101
std::string s3_endpoint
Definition: Importer.h:118
std::mutex file_offsets_mutex
Definition: Importer.h:759
std::string s3_region
Definition: Importer.h:117
ImportStatus import_status
Definition: Importer.h:755
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:121
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:102
std::string s3_secret_key
Definition: Importer.h:116
const std::string file_path
Definition: Importer.h:753
static void parse_url(const std::string url, std::map< int, std::string > &url_parts)
Definition: Archive.h:154
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ importDelimited()

virtual ImportStatus Importer_NS::DataStreamSink::importDelimited ( const std::string &  file_path,
const bool  decompressed 
)
pure virtual

Member Data Documentation

◆ copy_params

◆ file_offsets

std::vector<size_t> Importer_NS::DataStreamSink::file_offsets
protected

◆ file_offsets_mutex

std::mutex Importer_NS::DataStreamSink::file_offsets_mutex
protected

◆ file_path

◆ import_status

◆ load_failed

◆ p_file

FILE* Importer_NS::DataStreamSink::p_file = nullptr
protected

◆ total_file_size

size_t Importer_NS::DataStreamSink::total_file_size {0}
protected

The documentation for this class was generated from the following files: