OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
import_export::DataStreamSink Class Referenceabstract

#include <Importer.h>

+ Inheritance diagram for import_export::DataStreamSink:
+ Collaboration diagram for import_export::DataStreamSink:

Public Member Functions

 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
virtual ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 

Protected Member Functions

ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 

Protected Attributes

CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
mapd_shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 650 of file Importer.h.

Constructor & Destructor Documentation

import_export::DataStreamSink::DataStreamSink ( )
inline

Definition at line 652 of file Importer.h.

652 {}
import_export::DataStreamSink::DataStreamSink ( const CopyParams copy_params,
const std::string  file_path 
)
inline

Definition at line 653 of file Importer.h.

virtual import_export::DataStreamSink::~DataStreamSink ( )
inlinevirtual

Definition at line 655 of file Importer.h.

655 {}

Member Function Documentation

ImportStatus import_export::DataStreamSink::archivePlumber ( const Catalog_Namespace::SessionInfo session_info)
protected

Definition at line 3415 of file Importer.cpp.

References copy_params, file_path, import_export::CopyParams::file_type, get_filesize(), omnisci::glob(), import_compressed(), import_status_, and total_file_size.

Referenced by import_export::Importer::import(), and import_export::Detector::read_file().

3416  {
3417  // in generalized importing scheme, reaching here file_path may
3418  // contain a file path, a url or a wildcard of file paths.
3419  // see CopyTableStmt::execute.
3420  auto file_paths = omnisci::glob(file_path);
3421  if (file_paths.size() == 0) {
3422  file_paths.push_back(file_path);
3423  }
3424 
3425  // sum up sizes of all local files -- only for local files. if
3426  // file_path is a s3 url, sizes will be obtained via S3Archive.
3427  for (const auto& file_path : file_paths) {
3429  }
3430 
3431 #ifdef ENABLE_IMPORT_PARQUET
3432  // s3 parquet goes different route because the files do not use libarchive
3433  // but parquet api, and they need to landed like .7z files.
3434  //
3435  // note: parquet must be explicitly specified by a WITH parameter "parquet='true'",
3436  // because for example spark sql users may specify a output url w/o file
3437  // extension like this:
3438  // df.write
3439  // .mode("overwrite")
3440  // .parquet("s3://bucket/folder/parquet/mydata")
3441  // without the parameter, it means plain or compressed csv files.
3442  // note: .ORC and AVRO files should follow a similar path to Parquet?
3443  if (copy_params.file_type == FileType::PARQUET) {
3444  import_parquet(file_paths, session_info);
3445  } else
3446 #endif
3447  {
3448  import_compressed(file_paths, session_info);
3449  }
3450 
3451  return import_status_;
3452 }
void import_compressed(std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:3848
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:90
std::vector< std::string > glob(const std::string &pattern)
const std::string file_path
Definition: Importer.h:675

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const CopyParams& import_export::DataStreamSink::get_copy_params ( ) const
inline

Definition at line 667 of file Importer.h.

References copy_params.

Referenced by DBHandler::detect_column_types().

667 { return copy_params; }

+ Here is the caller graph for this function:

void import_export::DataStreamSink::import_compressed ( std::vector< std::string > &  file_paths,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 3848 of file Importer.cpp.

References import_export::CopyParams::buffer_size, omnisci::close(), copy_params, file_offsets, file_offsets_mutex, import_export::CopyParams::has_header, import_mutex_, import_status_, importDelimited(), import_export::CopyParams::line_delim, import_export::ImportStatus::load_failed, LOG, import_export::NO_HEADER, p_file, Archive::parse_url(), import_export::CopyParams::plain_text, import_export::ImportStatus::rows_completed, import_export::CopyParams::s3_access_key, import_export::CopyParams::s3_endpoint, import_export::CopyParams::s3_region, import_export::CopyParams::s3_secret_key, import_export::CopyParams::s3_session_token, total_file_size, logger::WARNING, and File_Namespace::write().

Referenced by archivePlumber().

3850  {
3851  // a new requirement is to have one single input stream into
3852  // Importer::importDelimited, so need to move pipe related
3853  // stuff to the outmost block.
3854  int fd[2];
3855 #ifdef _WIN32
3856  // For some reason when folly is used to create the pipe, reader can
3857  // read nothing.
3858  auto pipe_res =
3859  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
3860 #else
3861  auto pipe_res = pipe(fd);
3862 #endif
3863  if (pipe_res < 0) {
3864  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
3865  }
3866 #ifndef _WIN32
3867  signal(SIGPIPE, SIG_IGN);
3868 #endif
3869 
3870  std::exception_ptr teptr;
3871  // create a thread to read uncompressed byte stream out of pipe and
3872  // feed into importDelimited()
3873  ImportStatus ret1;
3874  auto th_pipe_reader = std::thread([&]() {
3875  try {
3876  // importDelimited will read from FILE* p_file
3877  if (0 == (p_file = fdopen(fd[0], "r"))) {
3878  throw std::runtime_error(std::string("failed to open a pipe: ") +
3879  strerror(errno));
3880  }
3881 
3882  // in future, depending on data types of this uncompressed stream
3883  // it can be feed into other function such like importParquet, etc
3884  ret1 = importDelimited(file_path, true, session_info);
3885 
3886  } catch (...) {
3887  if (!teptr) { // no replace
3888  teptr = std::current_exception();
3889  }
3890  }
3891 
3892  if (p_file) {
3893  fclose(p_file);
3894  }
3895  p_file = 0;
3896  });
3897 
3898  // create a thread to iterate all files (in all archives) and
3899  // forward the uncompressed byte stream to fd[1] which is
3900  // then feed into importDelimited, importParquet, and etc.
3901  auto th_pipe_writer = std::thread([&]() {
3902  std::unique_ptr<S3Archive> us3arch;
3903  bool stop = false;
3904  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
3905  try {
3906  auto file_path = file_paths[fi];
3907  std::unique_ptr<Archive> uarch;
3908  std::map<int, std::string> url_parts;
3909  Archive::parse_url(file_path, url_parts);
3910  const std::string S3_objkey_url_scheme = "s3ok";
3911  if ("file" == url_parts[2] || "" == url_parts[2]) {
3912  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3913  } else if ("s3" == url_parts[2]) {
3914 #ifdef HAVE_AWS_S3
3915  // new a S3Archive with a shared s3client.
3916  // should be safe b/c no wildcard with s3 url
3917  us3arch.reset(new S3Archive(file_path,
3924  us3arch->init_for_read();
3925  total_file_size += us3arch->get_total_file_size();
3926  // not land all files here but one by one in following iterations
3927  for (const auto& objkey : us3arch->get_objkeys()) {
3928  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
3929  }
3930  continue;
3931 #else
3932  throw std::runtime_error("AWS S3 support not available");
3933 #endif // HAVE_AWS_S3
3934  } else if (S3_objkey_url_scheme == url_parts[2]) {
3935 #ifdef HAVE_AWS_S3
3936  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
3937  auto file_path =
3938  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
3939  if (0 == file_path.size()) {
3940  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
3941  }
3942  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
3943  // file not removed until file closed
3944  us3arch->vacuum(objkey);
3945 #else
3946  throw std::runtime_error("AWS S3 support not available");
3947 #endif // HAVE_AWS_S3
3948  }
3949 #if 0 // TODO(ppan): implement and enable any other archive class
3950  else
3951  if ("hdfs" == url_parts[2])
3952  uarch.reset(new HdfsArchive(file_path));
3953 #endif
3954  else {
3955  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
3956  }
3957 
3958  // init the archive for read
3959  auto& arch = *uarch;
3960 
3961  // coming here, the archive of url should be ready to be read, unarchived
3962  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
3963  const void* buf;
3964  size_t size;
3965  bool just_saw_archive_header;
3966  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
3967  bool first_text_header_skipped = false;
3968  // start reading uncompressed bytes of this archive from libarchive
3969  // note! this archive may contain more than one files!
3970  file_offsets.push_back(0);
3971  size_t num_block_read = 0;
3972  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
3973  bool insert_line_delim_after_this_file = false;
3974  while (!stop) {
3975  int64_t offset{-1};
3976  auto ok = arch.read_data_block(&buf, &size, &offset);
3977  // can't use (uncompressed) size, so track (max) file offset.
3978  // also we want to capture offset even on e.o.f.
3979  if (offset > 0) {
3980  std::unique_lock<std::mutex> lock(file_offsets_mutex);
3981  file_offsets.back() = offset;
3982  }
3983  if (!ok) {
3984  break;
3985  }
3986  // one subtle point here is now we concatenate all files
3987  // to a single FILE stream with which we call importDelimited
3988  // only once. this would make it misunderstand that only one
3989  // header line is with this 'single' stream, while actually
3990  // we may have one header line for each of the files.
3991  // so we need to skip header lines here instead in importDelimited.
3992  const char* buf2 = (const char*)buf;
3993  int size2 = size;
3995  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
3996  while (size2-- > 0) {
3997  if (*buf2++ == copy_params.line_delim) {
3998  break;
3999  }
4000  }
4001  if (size2 <= 0) {
4002  LOG(WARNING) << "No line delimiter in block." << std::endl;
4003  } else {
4004  just_saw_archive_header = false;
4005  first_text_header_skipped = true;
4006  }
4007  }
4008  // In very rare occasions the write pipe somehow operates in a mode similar
4009  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4010  // means blocking mode. On such a unreliable blocking mode, a possible fix
4011  // is to loop reading till no bytes left, otherwise the annoying `failed to
4012  // write pipe: Success`...
4013  if (size2 > 0) {
4014  int nremaining = size2;
4015  while (nremaining > 0) {
4016  // try to write the entire remainder of the buffer to the pipe
4017  int nwritten = write(fd[1], buf2, nremaining);
4018  // how did we do?
4019  if (nwritten < 0) {
4020  // something bad happened
4021  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4022  // ignore these, assume nothing written, try again
4023  nwritten = 0;
4024  } else {
4025  // a real error
4026  throw std::runtime_error(
4027  std::string("failed or interrupted write to pipe: ") +
4028  strerror(errno));
4029  }
4030  } else if (nwritten == nremaining) {
4031  // we wrote everything; we're done
4032  break;
4033  }
4034  // only wrote some (or nothing), try again
4035  nremaining -= nwritten;
4036  buf2 += nwritten;
4037  // no exception when too many rejected
4038  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4040  stop = true;
4041  break;
4042  }
4043  }
4044  // check that this file (buf for size) ended with a line delim
4045  if (size > 0) {
4046  const char* plast = static_cast<const char*>(buf) + (size - 1);
4047  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4048  }
4049  }
4050  ++num_block_read;
4051  }
4052 
4053  // if that file didn't end with a line delim, we insert one here to terminate
4054  // that file's stream use a loop for the same reason as above
4055  if (insert_line_delim_after_this_file) {
4056  while (true) {
4057  // write the delim char to the pipe
4058  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4059  // how did we do?
4060  if (nwritten < 0) {
4061  // something bad happened
4062  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4063  // ignore these, assume nothing written, try again
4064  nwritten = 0;
4065  } else {
4066  // a real error
4067  throw std::runtime_error(
4068  std::string("failed or interrupted write to pipe: ") +
4069  strerror(errno));
4070  }
4071  } else if (nwritten == 1) {
4072  // we wrote it; we're done
4073  break;
4074  }
4075  }
4076  }
4077  }
4078  } catch (...) {
4079  // when import is aborted because too many data errors or because end of a
4080  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4081  // suppressed.
4082  mapd_shared_lock<mapd_shared_mutex> read_lock(import_mutex_);
4084  break;
4085  }
4086  if (import_status_.rows_completed > 0) {
4087  if (nullptr != dynamic_cast<Detector*>(this)) {
4088  break;
4089  }
4090  }
4091  if (!teptr) { // no replace
4092  teptr = std::current_exception();
4093  }
4094  break;
4095  }
4096  }
4097  // close writer end
4098  close(fd[1]);
4099  });
4100 
4101  th_pipe_reader.join();
4102  th_pipe_writer.join();
4103 
4104  // rethrow any exception happened herebefore
4105  if (teptr) {
4106  std::rethrow_exception(teptr);
4107  }
4108 }
std::string s3_secret_key
Definition: CopyParams.h:63
#define LOG(tag)
Definition: Logger.h:194
mapd_shared_mutex import_mutex_
Definition: Importer.h:678
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
ImportHeaderRow has_header
Definition: CopyParams.h:48
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
void close(const int fd)
Definition: omnisci_fs.cpp:68
std::string s3_session_token
Definition: CopyParams.h:64
mapd_shared_lock< mapd_shared_mutex > read_lock
std::string s3_access_key
Definition: CopyParams.h:62
std::vector< size_t > file_offsets
Definition: Importer.h:680
static void parse_url(const std::string url, std::map< int, std::string > &url_parts)
Definition: Archive.h:153
const std::string file_path
Definition: Importer.h:675

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual ImportStatus import_export::DataStreamSink::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
pure virtual

Implemented in import_export::Importer, and import_export::Detector.

Referenced by import_compressed().

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<size_t> import_export::DataStreamSink::file_offsets
protected

Definition at line 680 of file Importer.h.

Referenced by import_compressed(), and import_export::Importer::importDelimited().

std::mutex import_export::DataStreamSink::file_offsets_mutex
protected

Definition at line 681 of file Importer.h.

Referenced by import_compressed(), and import_export::Importer::importDelimited().

const std::string import_export::DataStreamSink::file_path
protected

Definition at line 675 of file Importer.h.

Referenced by archivePlumber(), and import_export::Importer::Importer().

size_t import_export::DataStreamSink::total_file_size {0}
protected

The documentation for this class was generated from the following files: