OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
import_export::DataStreamSink Class Referenceabstract

#include <Importer.h>

+ Inheritance diagram for import_export::DataStreamSink:
+ Collaboration diagram for import_export::DataStreamSink:

Public Member Functions

 DataStreamSink ()
 
 DataStreamSink (const CopyParams &copy_params, const std::string file_path)
 
virtual ~DataStreamSink ()
 
virtual ImportStatus importDelimited (const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
 
const CopyParamsget_copy_params () const
 
void import_compressed (std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
 

Protected Member Functions

ImportStatus archivePlumber (const Catalog_Namespace::SessionInfo *session_info)
 

Protected Attributes

CopyParams copy_params
 
const std::string file_path
 
FILE * p_file = nullptr
 
ImportStatus import_status_
 
heavyai::shared_mutex import_mutex_
 
size_t total_file_size {0}
 
std::vector< size_t > file_offsets
 
std::mutex file_offsets_mutex
 

Detailed Description

Definition at line 687 of file Importer.h.

Constructor & Destructor Documentation

import_export::DataStreamSink::DataStreamSink ( )
inline

Definition at line 689 of file Importer.h.

689 {}
import_export::DataStreamSink::DataStreamSink ( const CopyParams copy_params,
const std::string  file_path 
)
inline

Definition at line 690 of file Importer.h.

virtual import_export::DataStreamSink::~DataStreamSink ( )
inlinevirtual

Definition at line 692 of file Importer.h.

692 {}

Member Function Documentation

ImportStatus import_export::DataStreamSink::archivePlumber ( const Catalog_Namespace::SessionInfo session_info)
protected

Definition at line 3638 of file Importer.cpp.

References copy_params, file_path, import_export::CopyParams::file_sort_order_by, import_export::CopyParams::file_sort_regex, get_filesize(), import_compressed(), import_status_, import_export::kParquetFile, shared::local_glob_filter_sort_files(), import_export::CopyParams::regex_path_filter, import_export::CopyParams::source_type, total_file_size, and shared::validate_sort_options().

Referenced by import_export::Importer::import(), and import_export::Detector::read_file().

3639  {
3640  // in generalized importing scheme, reaching here file_path may
3641  // contain a file path, a url or a wildcard of file paths.
3642  // see CopyTableStmt::execute.
3643 
3644  std::vector<std::string> file_paths;
3645  try {
3650  file_paths = shared::local_glob_filter_sort_files(file_path, options);
3651  } catch (const shared::FileNotFoundException& e) {
3652  // After finding no matching files locally, file_path may still be an s3 url
3653  file_paths.push_back(file_path);
3654  }
3655 
3656  // sum up sizes of all local files -- only for local files. if
3657  // file_path is a s3 url, sizes will be obtained via S3Archive.
3658  for (const auto& file_path : file_paths) {
3660  }
3661 
3662  // s3 parquet goes different route because the files do not use libarchive
3663  // but parquet api, and they need to landed like .7z files.
3664  //
3665  // note: parquet must be explicitly specified by a WITH parameter
3666  // "source_type='parquet_file'", because for example spark sql users may specify a
3667  // output url w/o file extension like this:
3668  // df.write
3669  // .mode("overwrite")
3670  // .parquet("s3://bucket/folder/parquet/mydata")
3671  // without the parameter, it means plain or compressed csv files.
3672  // note: .ORC and AVRO files should follow a similar path to Parquet?
3674 #ifdef ENABLE_IMPORT_PARQUET
3675  import_parquet(file_paths, session_info);
3676 #else
3677  throw std::runtime_error("Parquet not supported!");
3678 #endif
3679  } else {
3680  import_compressed(file_paths, session_info);
3681  }
3682 
3683  return import_status_;
3684 }
void import_compressed(std::vector< std::string > &file_paths, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:4164
auto get_filesize(const std::string &file_path)
Definition: Importer.cpp:116
void validate_sort_options(const FilePathOptions &options)
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
import_export::SourceType source_type
Definition: CopyParams.h:57
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
const std::string file_path
Definition: Importer.h:712

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

const CopyParams& import_export::DataStreamSink::get_copy_params ( ) const
inline

Definition at line 704 of file Importer.h.

References copy_params.

Referenced by DBHandler::detect_column_types().

704 { return copy_params; }

+ Here is the caller graph for this function:

void import_export::DataStreamSink::import_compressed ( std::vector< std::string > &  file_paths,
const Catalog_Namespace::SessionInfo session_info 
)

Definition at line 4164 of file Importer.cpp.

References heavyai::close(), file_offsets, file_offsets_mutex, import_mutex_, import_status_, importDelimited(), import_export::kImportRowLimit, import_export::kNoHeader, import_export::ImportStatus::load_failed, LOG, p_file, Archive::parse_url(), import_export::ImportStatus::rows_completed, total_file_size, logger::WARNING, and File_Namespace::write().

Referenced by archivePlumber().

4166  {
4167  // a new requirement is to have one single input stream into
4168  // Importer::importDelimited, so need to move pipe related
4169  // stuff to the outmost block.
4170  int fd[2];
4171 #ifdef _WIN32
4172  // For some reason when folly is used to create the pipe, reader can
4173  // read nothing.
4174  auto pipe_res =
4175  _pipe(fd, static_cast<unsigned int>(copy_params.buffer_size), _O_BINARY);
4176 #else
4177  auto pipe_res = pipe(fd);
4178 #endif
4179  if (pipe_res < 0) {
4180  throw std::runtime_error(std::string("failed to create a pipe: ") + strerror(errno));
4181  }
4182 #ifndef _WIN32
4183  signal(SIGPIPE, SIG_IGN);
4184 #endif
4185 
4186  std::exception_ptr teptr;
4187  // create a thread to read uncompressed byte stream out of pipe and
4188  // feed into importDelimited()
4189  ImportStatus ret1;
4190  auto th_pipe_reader = std::thread([&]() {
4191  try {
4192  // importDelimited will read from FILE* p_file
4193  if (0 == (p_file = fdopen(fd[0], "r"))) {
4194  throw std::runtime_error(std::string("failed to open a pipe: ") +
4195  strerror(errno));
4196  }
4197 
4198  // in future, depending on data types of this uncompressed stream
4199  // it can be feed into other function such like importParquet, etc
4200  ret1 = importDelimited(file_path, true, session_info);
4201 
4202  } catch (...) {
4203  if (!teptr) { // no replace
4204  teptr = std::current_exception();
4205  }
4206  }
4207 
4208  if (p_file) {
4209  fclose(p_file);
4210  }
4211  p_file = 0;
4212  });
4213 
4214  // create a thread to iterate all files (in all archives) and
4215  // forward the uncompressed byte stream to fd[1] which is
4216  // then feed into importDelimited, importParquet, and etc.
4217  auto th_pipe_writer = std::thread([&]() {
4218  std::unique_ptr<S3Archive> us3arch;
4219  bool stop = false;
4220  for (size_t fi = 0; !stop && fi < file_paths.size(); fi++) {
4221  try {
4222  auto file_path = file_paths[fi];
4223  std::unique_ptr<Archive> uarch;
4224  std::map<int, std::string> url_parts;
4225  Archive::parse_url(file_path, url_parts);
4226  const std::string S3_objkey_url_scheme = "s3ok";
4227  if ("file" == url_parts[2] || "" == url_parts[2]) {
4228  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4229  } else if ("s3" == url_parts[2]) {
4230 #ifdef HAVE_AWS_S3
4231  // new a S3Archive with a shared s3client.
4232  // should be safe b/c no wildcard with s3 url
4233  us3arch.reset(new S3Archive(file_path,
4243  us3arch->init_for_read();
4244  total_file_size += us3arch->get_total_file_size();
4245  // not land all files here but one by one in following iterations
4246  for (const auto& objkey : us3arch->get_objkeys()) {
4247  file_paths.emplace_back(std::string(S3_objkey_url_scheme) + "://" + objkey);
4248  }
4249  continue;
4250 #else
4251  throw std::runtime_error("AWS S3 support not available");
4252 #endif // HAVE_AWS_S3
4253  } else if (S3_objkey_url_scheme == url_parts[2]) {
4254 #ifdef HAVE_AWS_S3
4255  auto objkey = file_path.substr(3 + S3_objkey_url_scheme.size());
4256  auto file_path =
4257  us3arch->land(objkey, teptr, nullptr != dynamic_cast<Detector*>(this));
4258  if (0 == file_path.size()) {
4259  throw std::runtime_error(std::string("failed to land s3 object: ") + objkey);
4260  }
4261  uarch.reset(new PosixFileArchive(file_path, copy_params.plain_text));
4262  // file not removed until file closed
4263  us3arch->vacuum(objkey);
4264 #else
4265  throw std::runtime_error("AWS S3 support not available");
4266 #endif // HAVE_AWS_S3
4267  }
4268 #if 0 // TODO(ppan): implement and enable any other archive class
4269  else
4270  if ("hdfs" == url_parts[2])
4271  uarch.reset(new HdfsArchive(file_path));
4272 #endif
4273  else {
4274  throw std::runtime_error(std::string("unsupported archive url: ") + file_path);
4275  }
4276 
4277  // init the archive for read
4278  auto& arch = *uarch;
4279 
4280  // coming here, the archive of url should be ready to be read, unarchived
4281  // and uncompressed by libarchive into a byte stream (in csv) for the pipe
4282  const void* buf;
4283  size_t size;
4284  bool just_saw_archive_header;
4285  bool is_detecting = nullptr != dynamic_cast<Detector*>(this);
4286  bool first_text_header_skipped = false;
4287  // start reading uncompressed bytes of this archive from libarchive
4288  // note! this archive may contain more than one files!
4289  file_offsets.push_back(0);
4290  size_t num_block_read = 0;
4291  while (!stop && !!(just_saw_archive_header = arch.read_next_header())) {
4292  bool insert_line_delim_after_this_file = false;
4293  while (!stop) {
4294  int64_t offset{-1};
4295  auto ok = arch.read_data_block(&buf, &size, &offset);
4296  // can't use (uncompressed) size, so track (max) file offset.
4297  // also we want to capture offset even on e.o.f.
4298  if (offset > 0) {
4299  std::unique_lock<std::mutex> lock(file_offsets_mutex);
4300  file_offsets.back() = offset;
4301  }
4302  if (!ok) {
4303  break;
4304  }
4305  // one subtle point here is now we concatenate all files
4306  // to a single FILE stream with which we call importDelimited
4307  // only once. this would make it misunderstand that only one
4308  // header line is with this 'single' stream, while actually
4309  // we may have one header line for each of the files.
4310  // so we need to skip header lines here instead in importDelimited.
4311  const char* buf2 = (const char*)buf;
4312  int size2 = size;
4314  just_saw_archive_header && (first_text_header_skipped || !is_detecting)) {
4315  while (size2-- > 0) {
4316  if (*buf2++ == copy_params.line_delim) {
4317  break;
4318  }
4319  }
4320  if (size2 <= 0) {
4321  LOG(WARNING) << "No line delimiter in block." << std::endl;
4322  } else {
4323  just_saw_archive_header = false;
4324  first_text_header_skipped = true;
4325  }
4326  }
4327  // In very rare occasions the write pipe somehow operates in a mode similar
4328  // to non-blocking while pipe(fds) should behave like pipe2(fds, 0) which
4329  // means blocking mode. On such a unreliable blocking mode, a possible fix
4330  // is to loop reading till no bytes left, otherwise the annoying `failed to
4331  // write pipe: Success`...
4332  if (size2 > 0) {
4333  int nremaining = size2;
4334  while (nremaining > 0) {
4335  // try to write the entire remainder of the buffer to the pipe
4336  int nwritten = write(fd[1], buf2, nremaining);
4337  // how did we do?
4338  if (nwritten < 0) {
4339  // something bad happened
4340  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4341  // ignore these, assume nothing written, try again
4342  nwritten = 0;
4343  } else if (errno == EPIPE &&
4345  // the reader thread has shut down the pipe from the read end
4346  stop = true;
4347  break;
4348  } else {
4349  // a real error
4350  throw std::runtime_error(
4351  std::string("failed or interrupted write to pipe: ") +
4352  strerror(errno));
4353  }
4354  } else if (nwritten == nremaining) {
4355  // we wrote everything; we're done
4356  break;
4357  }
4358  // only wrote some (or nothing), try again
4359  nremaining -= nwritten;
4360  buf2 += nwritten;
4361  // no exception when too many rejected
4364  stop = true;
4365  break;
4366  }
4367  }
4368  // check that this file (buf for size) ended with a line delim
4369  if (size > 0) {
4370  const char* plast = static_cast<const char*>(buf) + (size - 1);
4371  insert_line_delim_after_this_file = (*plast != copy_params.line_delim);
4372  }
4373  }
4374  ++num_block_read;
4375  }
4376 
4377  // if that file didn't end with a line delim, we insert one here to terminate
4378  // that file's stream use a loop for the same reason as above
4379  if (insert_line_delim_after_this_file) {
4380  while (true) {
4381  // write the delim char to the pipe
4382  int nwritten = write(fd[1], &copy_params.line_delim, 1);
4383  // how did we do?
4384  if (nwritten < 0) {
4385  // something bad happened
4386  if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
4387  // ignore these, assume nothing written, try again
4388  nwritten = 0;
4389  } else if (errno == EPIPE &&
4391  // the reader thread has shut down the pipe from the read end
4392  stop = true;
4393  break;
4394  } else {
4395  // a real error
4396  throw std::runtime_error(
4397  std::string("failed or interrupted write to pipe: ") +
4398  strerror(errno));
4399  }
4400  } else if (nwritten == 1) {
4401  // we wrote it; we're done
4402  break;
4403  }
4404  }
4405  }
4406  }
4407  } catch (...) {
4408  // when import is aborted because too many data errors or because end of a
4409  // detection, any exception thrown by s3 sdk or libarchive is okay and should be
4410  // suppressed.
4413  break;
4414  }
4415  if (import_status_.rows_completed > 0) {
4416  if (nullptr != dynamic_cast<Detector*>(this)) {
4417  break;
4418  }
4419  }
4420  if (!teptr) { // no replace
4421  teptr = std::current_exception();
4422  }
4423  break;
4424  }
4425  }
4426  // close writer end
4427  close(fd[1]);
4428  });
4429 
4430  th_pipe_reader.join();
4431  th_pipe_writer.join();
4432 
4433  // rethrow any exception happened herebefore
4434  if (teptr) {
4435  std::rethrow_exception(teptr);
4436  }
4437 }
std::string s3_secret_key
Definition: CopyParams.h:62
heavyai::shared_lock< heavyai::shared_mutex > read_lock
#define LOG(tag)
Definition: Logger.h:285
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:160
std::shared_lock< T > shared_lock
ImportHeaderRow has_header
Definition: CopyParams.h:46
virtual ImportStatus importDelimited(const std::string &file_path, const bool decompressed, const Catalog_Namespace::SessionInfo *session_info)=0
std::optional< std::string > regex_path_filter
Definition: CopyParams.h:85
std::string s3_session_token
Definition: CopyParams.h:63
static const size_t kImportRowLimit
Definition: Importer.cpp:169
void close(const int fd)
Definition: heavyai_fs.cpp:70
std::string s3_access_key
Definition: CopyParams.h:61
std::vector< size_t > file_offsets
Definition: Importer.h:717
std::optional< std::string > file_sort_order_by
Definition: CopyParams.h:86
heavyai::shared_mutex import_mutex_
Definition: Importer.h:715
std::optional< std::string > file_sort_regex
Definition: CopyParams.h:87
static void parse_url(const std::string url, std::map< int, std::string > &url_parts)
Definition: Archive.h:156
const std::string file_path
Definition: Importer.h:712

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual ImportStatus import_export::DataStreamSink::importDelimited ( const std::string &  file_path,
const bool  decompressed,
const Catalog_Namespace::SessionInfo session_info 
)
pure virtual

Implemented in import_export::Importer, and import_export::Detector.

Referenced by import_compressed().

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<size_t> import_export::DataStreamSink::file_offsets
protected

Definition at line 717 of file Importer.h.

Referenced by import_compressed(), and import_export::Importer::importDelimited().

std::mutex import_export::DataStreamSink::file_offsets_mutex
protected

Definition at line 718 of file Importer.h.

Referenced by import_compressed(), and import_export::Importer::importDelimited().

const std::string import_export::DataStreamSink::file_path
protected

Definition at line 712 of file Importer.h.

Referenced by archivePlumber(), and import_export::Importer::Importer().

size_t import_export::DataStreamSink::total_file_size {0}
protected

The documentation for this class was generated from the following files: