OmniSciDB  95562058bd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::CsvDataWrapper Class Reference

#include <CsvDataWrapper.h>

+ Inheritance diagram for foreign_storage::CsvDataWrapper:
+ Collaboration diagram for foreign_storage::CsvDataWrapper:

Public Member Functions

 CsvDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (std::map< ChunkKey, AbstractBuffer * > &required_buffers, std::map< ChunkKey, AbstractBuffer * > &optional_buffers) override
 
void serializeDataWrapperInternals (const std::string &file_path) const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 

Static Public Member Functions

static void validateOptions (const ForeignTable *foreign_table)
 
static std::vector
< std::string_view > 
getSupportedOptions ()
 

Private Member Functions

 CsvDataWrapper (const ForeignTable *foreign_table)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
std::string getFilePath ()
 
import_export::CopyParams validateAndGetCopyParams ()
 
void validateFilePath ()
 
std::string validateAndGetStringWithLength (const std::string &option_name, const size_t expected_num_chars)
 
std::optional< bool > validateAndGetBoolValue (const std::string &option_name)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const std::map< ChunkKey, AbstractBuffer * > &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< CsvReadercsv_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::mutex file_access_mutex_
 
std::mutex file_regions_mutex_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
std::map< ChunkKey, size_t > chunk_byte_count_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 

Static Private Attributes

static constexpr std::array
< char const *, 13 > 
supported_options_
 

Detailed Description

Definition at line 55 of file CsvDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::CsvDataWrapper::CsvDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)

Definition at line 35 of file CsvDataWrapper.cpp.

36  : db_id_(db_id), foreign_table_(foreign_table), is_restored_(false) {}
const ForeignTable * foreign_table_
foreign_storage::CsvDataWrapper::CsvDataWrapper ( const ForeignTable foreign_table)
private

Definition at line 38 of file CsvDataWrapper.cpp.

39  : db_id_(-1), foreign_table_(foreign_table), is_restored_(false) {}
const ForeignTable * foreign_table_

Member Function Documentation

std::string foreign_storage::CsvDataWrapper::getFilePath ( )
private

Definition at line 52 of file CsvDataWrapper.cpp.

References foreign_storage::ForeignServer::BASE_PATH_KEY, foreign_storage::ForeignTable::foreign_server, foreign_table_, foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, and foreign_storage::ForeignServer::STORAGE_TYPE_KEY.

Referenced by populateChunkMetadata(), restoreDataWrapperInternals(), and validateFilePath().

52  {
53  auto& server_options = foreign_table_->foreign_server->options;
54  auto file_path_entry = foreign_table_->options.find("FILE_PATH");
55  std::string file_path{};
56  if (file_path_entry != foreign_table_->options.end()) {
57  file_path = file_path_entry->second;
58  }
59  std::string base_path{};
60  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
62  auto base_path_entry = server_options.find(ForeignServer::BASE_PATH_KEY);
63  if (base_path_entry == server_options.end()) {
64  throw std::runtime_error{"No base path found in foreign server options."};
65  }
66  base_path = base_path_entry->second;
67  const std::string separator{boost::filesystem::path::preferred_separator};
68  return std::regex_replace(
69  base_path + separator + file_path, std::regex{separator + "{2,}"}, separator);
70  } else {
71  // Just return the file path as a prefix
72  return file_path;
73  }
74 }
std::map< std::string, std::string, std::less<> > options
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:39
static constexpr std::string_view BASE_PATH_KEY
Definition: ForeignServer.h:44

+ Here is the caller graph for this function:

std::vector< std::string_view > foreign_storage::CsvDataWrapper::getSupportedOptions ( )
static

Definition at line 47 of file CsvDataWrapper.cpp.

References supported_options_.

Referenced by CreateForeignTableCommand::setTableDetails().

47  {
48  return std::vector<std::string_view>{supported_options_.begin(),
49  supported_options_.end()};
50 }
static constexpr std::array< char const *, 13 > supported_options_

+ Here is the caller graph for this function:

bool foreign_storage::CsvDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1056 of file CsvDataWrapper.cpp.

References is_restored_.

1056  {
1057  return is_restored_;
1058 }
void foreign_storage::CsvDataWrapper::populateChunkBuffers ( std::map< ChunkKey, AbstractBuffer * > &  required_buffers,
std::map< ChunkKey, AbstractBuffer * > &  optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 232 of file CsvDataWrapper.cpp.

References CHECK, Catalog_Namespace::Catalog::checkedGet(), CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::get_columns(), populateChunkMapForColumns(), populateChunks(), and TableDescriptor::tableId.

234  {
235  auto timer = DEBUG_TIMER(__func__);
237  CHECK(!required_buffers.empty());
238 
239  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
240  std::set<const ColumnDescriptor*> required_columns =
241  get_columns(required_buffers, catalog, foreign_table_->tableId, fragment_id);
242  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
244  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
245 
246  if (!optional_buffers.empty()) {
247  std::set<const ColumnDescriptor*> optional_columns;
248  optional_columns =
249  get_columns(optional_buffers, catalog, foreign_table_->tableId, fragment_id);
251  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
252  }
253  populateChunks(column_id_to_chunk_map, fragment_id);
254 
255  for (auto& entry : column_id_to_chunk_map) {
256  entry.second.setBuffer(nullptr);
257  entry.second.setIndexBuffer(nullptr);
258  }
259 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3770
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const std::map< ChunkKey, AbstractBuffer * > &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::set< const ColumnDescriptor * > get_columns(const std::map< ChunkKey, AbstractBuffer * > &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const std::map< ChunkKey, AbstractBuffer * > &  buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 184 of file CsvDataWrapper.cpp.

References CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, foreign_table_, Data_Namespace::AbstractBuffer::reserve(), Chunk_NS::Chunk::setBuffer(), Data_Namespace::AbstractBuffer::size(), TableDescriptor::tableId, and UNREACHABLE.

Referenced by populateChunkBuffers().

188  {
189  for (const auto column : columns) {
190  ChunkKey data_chunk_key;
191  AbstractBuffer* data_buffer = nullptr;
192  AbstractBuffer* index_buffer = nullptr;
193  if (column->columnType.is_varlen_indeed()) {
194  data_chunk_key = {
195  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 1};
196  ChunkKey index_chunk_key = {
197  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 2};
198 
199  CHECK(buffers.find(data_chunk_key) != buffers.end());
200  CHECK(buffers.find(index_chunk_key) != buffers.end());
201 
202  data_buffer = buffers.find(data_chunk_key)->second;
203  index_buffer = buffers.find(index_chunk_key)->second;
204 
205  CHECK_EQ(data_buffer->size(), static_cast<size_t>(0));
206  CHECK_EQ(index_buffer->size(), static_cast<size_t>(0));
207 
208  size_t index_offset_size{0};
209  if (column->columnType.is_string() || column->columnType.is_geometry()) {
210  index_offset_size = sizeof(StringOffsetT);
211  } else if (column->columnType.is_array()) {
212  index_offset_size = sizeof(ArrayOffsetT);
213  } else {
214  UNREACHABLE();
215  }
216  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
217  index_buffer->reserve(index_offset_size *
218  (chunk_metadata_map_[data_chunk_key]->numElements + 1));
219  } else {
220  data_chunk_key = {db_id_, foreign_table_->tableId, column->columnId, fragment_id};
221  CHECK(buffers.find(data_chunk_key) != buffers.end());
222  data_buffer = buffers.find(data_chunk_key)->second;
223  }
224  data_buffer->reserve(chunk_metadata_map_[data_chunk_key]->numBytes);
225  column_id_to_chunk_map[column->columnId] = Chunk_NS::Chunk{column};
226  column_id_to_chunk_map[column->columnId].setBuffer(data_buffer);
227  column_id_to_chunk_map[column->columnId].setIndexBuffer(index_buffer);
228  column_id_to_chunk_map[column->columnId].initEncoder();
229  }
230 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
#define UNREACHABLE()
Definition: Logger.h:241
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:108
int32_t StringOffsetT
Definition: sqltypes.h:868
An AbstractBuffer is a unit of data management for a data manager.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
int32_t ArrayOffsetT
Definition: sqltypes.h:869
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:197
virtual void reserve(size_t num_bytes)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for CSV file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with CSV content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 842 of file CsvDataWrapper.cpp.

References append_start_offset_, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer_alloc_size, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer_size, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::catalog, Catalog_Namespace::Catalog::checkedGet(), foreign_storage::MetadataScanMultiThreadingParams::chunk_byte_count, chunk_byte_count_, foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, chunk_metadata_map_, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::columns, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::copy_params, csv_reader_, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::db_id, db_id_, DEBUG_TIMER, foreign_storage::dispatch_metadata_scan_requests(), file_regions_mutex_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), getFilePath(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::import_buffers, foreign_storage::initialize_import_buffers(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::max_fragment_rows, TableDescriptor::maxFragRows, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::ForeignServer::STORAGE_TYPE_KEY, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::table_id, TableDescriptor::tableId, UNREACHABLE, and validateAndGetCopyParams().

842  {
843  auto timer = DEBUG_TIMER(__func__);
844  chunk_metadata_map_.clear();
845 
846  const auto copy_params = validateAndGetCopyParams();
847  const auto file_path = getFilePath();
849  auto& server_options = foreign_table_->foreign_server->options;
850  if (foreign_table_->isAppendMode() && csv_reader_ != nullptr) {
851  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
853  csv_reader_->checkForMoreRows(append_start_offset_);
854  } else {
855  UNREACHABLE();
856  }
857  } else {
859  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
861  csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
862  } else {
863  UNREACHABLE();
864  }
865  num_rows_ = 0;
867  }
868 
869  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
870  foreign_table_->tableId, false, false, true);
871  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
872  for (auto column : columns) {
873  column_by_id[column->columnId] = column;
874  }
875  MetadataScanMultiThreadingParams multi_threading_params;
876 
877  // Restore previous chunk data
878  if (foreign_table_->isAppendMode()) {
879  multi_threading_params.chunk_byte_count = chunk_byte_count_;
880  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
881  }
882 
883  if (!csv_reader_->isScanFinished()) {
884  auto buffer_size = get_buffer_size(copy_params,
885  csv_reader_->isRemainingSizeKnown(),
886  csv_reader_->getRemainingSize());
887  auto thread_count = get_thread_count(copy_params,
888  csv_reader_->isRemainingSizeKnown(),
889  csv_reader_->getRemainingSize(),
890  buffer_size);
891  multi_threading_params.continue_processing = true;
892 
893  std::vector<std::future<void>> futures{};
894  for (size_t i = 0; i < thread_count; i++) {
895  futures.emplace_back(std::async(std::launch::async,
897  std::ref(multi_threading_params),
899  std::ref(file_regions_mutex_)));
900 
901  multi_threading_params.request_pool.emplace();
902  csv_file_buffer_parser::ParseBufferRequest& parse_buffer_request =
903  multi_threading_params.request_pool.back();
904  initialize_import_buffers(columns, catalog, parse_buffer_request.import_buffers);
905  parse_buffer_request.copy_params = copy_params;
906  parse_buffer_request.columns = columns;
907  parse_buffer_request.catalog = catalog;
908  parse_buffer_request.db_id = db_id_;
909  parse_buffer_request.table_id = foreign_table_->tableId;
910  parse_buffer_request.max_fragment_rows = foreign_table_->maxFragRows;
911  parse_buffer_request.buffer = std::make_unique<char[]>(buffer_size);
912  parse_buffer_request.buffer_size = buffer_size;
913  parse_buffer_request.buffer_alloc_size = buffer_size;
914  }
915 
916  try {
918  file_path,
919  (*csv_reader_),
920  copy_params,
921  multi_threading_params,
922  num_rows_,
924  } catch (...) {
925  {
926  std::unique_lock<std::mutex> pending_requests_lock(
927  multi_threading_params.pending_requests_mutex);
928  multi_threading_params.continue_processing = false;
929  }
930  multi_threading_params.pending_requests_condition.notify_all();
931  throw;
932  }
933 
934  for (auto& future : futures) {
935  // get() instead of wait() because we need to propagate potential exceptions.
936  future.get();
937  }
938  }
939 
940  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
941  auto chunk_metadata =
942  buffer->getEncoder()->getMetadata(column_by_id[chunk_key[2]]->columnType);
943  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
944  chunk_metadata->numBytes = multi_threading_params.chunk_byte_count[chunk_key];
945  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
946  chunk_metadata_map_[chunk_key] = chunk_metadata;
947  }
948 
949  // Save chunk data
950  if (foreign_table_->isAppendMode()) {
951  chunk_byte_count_ = multi_threading_params.chunk_byte_count;
952  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
953  }
954 
955  for (auto& entry : fragment_id_to_file_regions_map_) {
956  std::sort(entry.second.begin(), entry.second.end());
957  }
958 }
std::map< ChunkKey, size_t > chunk_byte_count_
std::map< std::string, std::string, std::less<> > options
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
#define UNREACHABLE()
Definition: Logger.h:241
void initialize_import_buffers(const std::list< const ColumnDescriptor * > &columns, std::shared_ptr< Catalog_Namespace::Catalog > catalog, std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const std::set< int > &column_filter_set={})
std::unique_ptr< CsvReader > csv_reader_
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
std::map< int, FileRegions > fragment_id_to_file_regions_map_
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3770
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:39
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, std::mutex &file_region_mutex)
import_export::CopyParams validateAndGetCopyParams()
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks

Definition at line 399 of file CsvDataWrapper.cpp.

References foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer_alloc_size, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer_size, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::catalog, CHECK, Catalog_Namespace::Catalog::checkedGet(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::columns, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::copy_params, csv_reader_, db_id_, file_access_mutex_, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::import_buffers, foreign_storage::initialize_import_buffers(), foreign_storage::parse_file_regions(), run_benchmark_import::result, TableDescriptor::tableId, and validateAndGetCopyParams().

Referenced by populateChunkBuffers().

401  {
402  const auto copy_params = validateAndGetCopyParams();
403 
404  CHECK(!column_id_to_chunk_map.empty());
405  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
406  CHECK(!file_regions.empty());
407 
408  const auto buffer_size = get_buffer_size(file_regions);
409  const auto thread_count = get_thread_count(copy_params, file_regions);
410 
412  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
413  foreign_table_->tableId, false, false, true);
414  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
415 
416  std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
417  parse_file_requests.reserve(thread_count);
418  std::vector<std::future<ParseFileRegionResult>> futures{};
419  std::set<int> column_filter_set;
420  for (const auto& pair : column_id_to_chunk_map) {
421  column_filter_set.insert(pair.first);
422  }
423  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
424  parse_file_requests.emplace_back();
425  csv_file_buffer_parser::ParseBufferRequest& parse_file_request =
426  parse_file_requests.back();
427  parse_file_request.buffer = std::make_unique<char[]>(buffer_size);
428  parse_file_request.buffer_size = buffer_size;
429  parse_file_request.buffer_alloc_size = buffer_size;
430  parse_file_request.copy_params = copy_params;
431  parse_file_request.columns = columns;
432  parse_file_request.catalog = catalog;
434  columns, catalog, parse_file_request.import_buffers, column_filter_set);
435 
436  auto start_index = i;
437  auto end_index =
438  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
439  futures.emplace_back(std::async(std::launch::async,
441  std::ref(file_regions),
442  start_index,
443  end_index,
444  std::ref((*csv_reader_)),
445  std::ref(file_access_mutex_),
446  std::ref(parse_file_request),
447  std::ref(column_id_to_chunk_map)));
448  }
449 
450  std::set<ParseFileRegionResult> load_file_region_results{};
451  for (auto& future : futures) {
452  future.wait();
453  load_file_region_results.emplace(future.get());
454  }
455 
456  for (auto result : load_file_region_results) {
457  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
458  chunk.appendData(
459  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
460  }
461  }
462 }
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, std::mutex &file_access_mutex, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
void initialize_import_buffers(const std::list< const ColumnDescriptor * > &columns, std::shared_ptr< Catalog_Namespace::Catalog > catalog, std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const std::set< int > &column_filter_set={})
std::unique_ptr< CsvReader > csv_reader_
std::map< int, FileRegions > fragment_id_to_file_regions_map_
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3770
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:197
import_export::CopyParams validateAndGetCopyParams()

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1007 of file CsvDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_byte_count_, chunk_encoder_buffers_, chunk_metadata_map_, csv_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::json_utils::get_value_from_object(), getFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::json_utils::read_from_file(), foreign_storage::ForeignServer::STORAGE_TYPE_KEY, UNREACHABLE, and validateAndGetCopyParams().

1009  {
1010  auto d = json_utils::read_from_file(file_path);
1011  CHECK(d.IsObject());
1012 
1013  // Restore fragment map
1015  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1016 
1017  // Construct csv_reader with metadta
1018  CHECK(d.HasMember("reader_metadata"));
1019  const auto copy_params = validateAndGetCopyParams();
1020  const auto csv_file_path = getFilePath();
1021  auto& server_options = foreign_table_->foreign_server->options;
1022  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
1024  csv_reader_ = std::make_unique<LocalMultiFileReader>(
1025  csv_file_path, copy_params, d["reader_metadata"]);
1026  } else {
1027  UNREACHABLE();
1028  }
1029 
1031  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1032 
1033  // Now restore the internal metadata maps
1034  CHECK(chunk_metadata_map_.empty());
1035  CHECK(chunk_encoder_buffers_.empty());
1036 
1037  for (auto& pair : chunk_metadata) {
1038  chunk_metadata_map_[pair.first] = pair.second;
1039 
1040  if (foreign_table_->isAppendMode()) {
1041  // Restore encoder state for append mode
1042  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1043  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1044  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1045  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1046  pair.second->numElements);
1047  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1048  pair.second->chunkStats);
1049  chunk_encoder_buffers_[pair.first]->setUpdated();
1050  chunk_byte_count_[pair.first] = pair.second->numBytes;
1051  }
1052  }
1053  is_restored_ = true;
1054 }
std::map< ChunkKey, size_t > chunk_byte_count_
std::map< std::string, std::string, std::less<> > options
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
#define UNREACHABLE()
Definition: Logger.h:241
std::unique_ptr< CsvReader > csv_reader_
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
rapidjson::Document read_from_file(const std::string &file_path)
std::map< int, FileRegions > fragment_id_to_file_regions_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:39
#define CHECK(condition)
Definition: Logger.h:197
import_export::CopyParams validateAndGetCopyParams()

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::serializeDataWrapperInternals ( const std::string &  file_path) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Parameters
file_path- location to save file to

Implements foreign_storage::ForeignDataWrapper.

Definition at line 985 of file CsvDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), append_start_offset_, csv_reader_, fragment_id_to_file_regions_map_, num_rows_, and foreign_storage::json_utils::write_to_file().

985  {
986  rapidjson::Document d;
987  d.SetObject();
988 
989  // Save fragment map
992  "fragment_id_to_file_regions_map",
993  d.GetAllocator());
994 
995  // Save csv_reader metadata
996  rapidjson::Value reader_metadata(rapidjson::kObjectType);
997  csv_reader_->serialize(reader_metadata, d.GetAllocator());
998  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
999 
1000  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1002  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1003 
1004  json_utils::write_to_file(d, file_path);
1005 }
std::unique_ptr< CsvReader > csv_reader_
std::map< int, FileRegions > fragment_id_to_file_regions_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)

+ Here is the call graph for this function:

std::optional< bool > foreign_storage::CsvDataWrapper::validateAndGetBoolValue ( const std::string &  option_name)
private

Validates that the string value of given table option is either "true" or "false" (case insensitive). An exception is thrown if option value does not match one of the two possible values.

Parameters
option_name- name of table option whose value is validated and returned
Returns
corresponding bool for option value. Returns an empty optional if table options do not contain provided option.

Definition at line 149 of file CsvDataWrapper.cpp.

References foreign_table_, and foreign_storage::OptionsContainer::options.

Referenced by validateAndGetCopyParams().

150  {
151  if (auto it = foreign_table_->options.find(option_name);
152  it != foreign_table_->options.end()) {
153  if (boost::iequals(it->second, "TRUE")) {
154  return true;
155  } else if (boost::iequals(it->second, "FALSE")) {
156  return false;
157  } else {
158  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
159  "\" foreign table option. "
160  "Value must be either 'true' or 'false'."};
161  }
162  }
163  return std::nullopt;
164 }
std::map< std::string, std::string, std::less<> > options
const ForeignTable * foreign_table_

+ Here is the caller graph for this function:

import_export::CopyParams foreign_storage::CsvDataWrapper::validateAndGetCopyParams ( )
private

Definition at line 85 of file CsvDataWrapper.cpp.

References foreign_table_, import_export::HAS_HEADER, import_export::NO_HEADER, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, validateAndGetBoolValue(), and validateAndGetStringWithLength().

Referenced by populateChunkMetadata(), populateChunks(), and restoreDataWrapperInternals().

85  {
86  import_export::CopyParams copy_params{};
87  copy_params.plain_text = true;
88  if (const auto& value = validateAndGetStringWithLength("ARRAY_DELIMITER", 1);
89  !value.empty()) {
90  copy_params.array_delim = value[0];
91  }
92  if (const auto& value = validateAndGetStringWithLength("ARRAY_MARKER", 2);
93  !value.empty()) {
94  copy_params.array_begin = value[0];
95  copy_params.array_end = value[1];
96  }
97  if (auto it = foreign_table_->options.find("BUFFER_SIZE");
98  it != foreign_table_->options.end()) {
99  copy_params.buffer_size = std::stoi(it->second);
100  }
101  if (const auto& value = validateAndGetStringWithLength("DELIMITER", 1);
102  !value.empty()) {
103  copy_params.delimiter = value[0];
104  }
105  if (const auto& value = validateAndGetStringWithLength("ESCAPE", 1); !value.empty()) {
106  copy_params.escape = value[0];
107  }
108  auto has_header = validateAndGetBoolValue("HEADER");
109  if (has_header.has_value()) {
110  if (has_header.value()) {
111  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
112  } else {
113  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
114  }
115  }
116  if (const auto& value = validateAndGetStringWithLength("LINE_DELIMITER", 1);
117  !value.empty()) {
118  copy_params.line_delim = value[0];
119  }
120  copy_params.lonlat = validateAndGetBoolValue("LONLAT").value_or(copy_params.lonlat);
121 
122  if (auto it = foreign_table_->options.find("NULLS");
123  it != foreign_table_->options.end()) {
124  copy_params.null_str = it->second;
125  }
126  if (const auto& value = validateAndGetStringWithLength("QUOTE", 1); !value.empty()) {
127  copy_params.quote = value[0];
128  }
129  copy_params.quoted = validateAndGetBoolValue("QUOTED").value_or(copy_params.quoted);
130  return copy_params;
131 }
std::map< std::string, std::string, std::less<> > options
std::string validateAndGetStringWithLength(const std::string &option_name, const size_t expected_num_chars)
std::optional< bool > validateAndGetBoolValue(const std::string &option_name)
const ForeignTable * foreign_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string foreign_storage::CsvDataWrapper::validateAndGetStringWithLength ( const std::string &  option_name,
const size_t  expected_num_chars 
)
private

Validates that the value of given table option has the expected number of characters. An exception is thrown if the number of characters do not match.

Parameters
option_name- name of table option whose value is validated and returned
expected_num_chars- expected number of characters for option value
Returns
value of the option if the number of characters match. Returns an empty string if table options do not contain provided option.

Definition at line 133 of file CsvDataWrapper.cpp.

References foreign_table_, foreign_storage::OptionsContainer::options, and to_string().

Referenced by validateAndGetCopyParams().

135  {
136  if (auto it = foreign_table_->options.find(option_name);
137  it != foreign_table_->options.end()) {
138  if (it->second.length() != expected_num_chars) {
139  throw std::runtime_error{"Value of \"" + option_name +
140  "\" foreign table option has the wrong number of "
141  "characters. Expected " +
142  std::to_string(expected_num_chars) + " character(s)."};
143  }
144  return it->second;
145  }
146  return "";
147 }
std::map< std::string, std::string, std::less<> > options
std::string to_string(char const *&&v)
const ForeignTable * foreign_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::validateFilePath ( )
private

Definition at line 76 of file CsvDataWrapper.cpp.

References foreign_storage::ForeignTable::foreign_server, foreign_table_, getFilePath(), ddl_utils::IMPORT, foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::ForeignServer::STORAGE_TYPE_KEY, and ddl_utils::validate_allowed_file_path().

76  {
77  auto& server_options = foreign_table_->foreign_server->options;
78  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
82  }
83 }
std::map< std::string, std::string, std::less<> > options
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:611
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:39

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::validateOptions ( const ForeignTable foreign_table)
static

Definition at line 41 of file CsvDataWrapper.cpp.

Referenced by CreateForeignTableCommand::setTableDetails().

41  {
42  CsvDataWrapper data_wrapper{foreign_table};
43  data_wrapper.validateAndGetCopyParams();
44  data_wrapper.validateFilePath();
45 }
CsvDataWrapper(const int db_id, const ForeignTable *foreign_table)

+ Here is the caller graph for this function:

Member Data Documentation

size_t foreign_storage::CsvDataWrapper::append_start_offset_
private
std::map<ChunkKey, size_t> foreign_storage::CsvDataWrapper::chunk_byte_count_
private

Definition at line 132 of file CsvDataWrapper.h.

Referenced by populateChunkMetadata(), and restoreDataWrapperInternals().

std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::CsvDataWrapper::chunk_encoder_buffers_
private

Definition at line 131 of file CsvDataWrapper.h.

Referenced by populateChunkMetadata(), and restoreDataWrapperInternals().

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::CsvDataWrapper::chunk_metadata_map_
private
std::unique_ptr<CsvReader> foreign_storage::CsvDataWrapper::csv_reader_
private
const int foreign_storage::CsvDataWrapper::db_id_
private
std::mutex foreign_storage::CsvDataWrapper::file_access_mutex_
private

Definition at line 127 of file CsvDataWrapper.h.

Referenced by populateChunks().

std::mutex foreign_storage::CsvDataWrapper::file_regions_mutex_
private

Definition at line 128 of file CsvDataWrapper.h.

Referenced by populateChunkMetadata().

std::map<int, FileRegions> foreign_storage::CsvDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::CsvDataWrapper::is_restored_
private

Definition at line 138 of file CsvDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

size_t foreign_storage::CsvDataWrapper::num_rows_
private
constexpr std::array<char const*, 13> foreign_storage::CsvDataWrapper::supported_options_
staticprivate
Initial value:
{"BASE_PATH",
"FILE_PATH",
"ARRAY_DELIMITER",
"ARRAY_MARKER",
"BUFFER_SIZE",
"DELIMITER",
"ESCAPE",
"HEADER",
"LINE_DELIMITER",
"LONLAT",
"NULLS",
"QUOTE",
"QUOTED"}

Definition at line 139 of file CsvDataWrapper.h.

Referenced by getSupportedOptions().


The documentation for this class was generated from the following files: