OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::CsvDataWrapper Class Reference

#include <CsvDataWrapper.h>

+ Inheritance diagram for foreign_storage::CsvDataWrapper:
+ Collaboration diagram for foreign_storage::CsvDataWrapper:

Public Member Functions

 CsvDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (std::map< ChunkKey, AbstractBuffer * > &required_buffers, std::map< ChunkKey, AbstractBuffer * > &optional_buffers) override
 
void serializeDataWrapperInternals (const std::string &file_path) const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 

Static Public Member Functions

static void validateOptions (const ForeignTable *foreign_table)
 
static std::vector
< std::string_view > 
getSupportedOptions ()
 

Private Member Functions

 CsvDataWrapper (const ForeignTable *foreign_table)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
std::string getFilePath ()
 
import_export::CopyParams validateAndGetCopyParams ()
 
void validateFilePath ()
 
std::string validateAndGetStringWithLength (const std::string &option_name, const size_t expected_num_chars)
 
std::optional< bool > validateAndGetBoolValue (const std::string &option_name)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const std::map< ChunkKey, AbstractBuffer * > &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 
void updateMetadata (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< CsvReadercsv_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::mutex file_access_mutex_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
std::map< ChunkKey, size_t > chunk_byte_count_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 

Static Private Attributes

static constexpr std::array
< char const *, 11 > 
supported_options_
 

Detailed Description

Definition at line 66 of file CsvDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::CsvDataWrapper::CsvDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)

Definition at line 37 of file CsvDataWrapper.cpp.

38  : db_id_(db_id), foreign_table_(foreign_table), is_restored_(false) {}
const ForeignTable * foreign_table_
foreign_storage::CsvDataWrapper::CsvDataWrapper ( const ForeignTable foreign_table)
private

Definition at line 40 of file CsvDataWrapper.cpp.

41  : db_id_(-1), foreign_table_(foreign_table), is_restored_(false) {}
const ForeignTable * foreign_table_

Member Function Documentation

std::string foreign_storage::CsvDataWrapper::getFilePath ( )
private
std::vector< std::string_view > foreign_storage::CsvDataWrapper::getSupportedOptions ( )
static

Definition at line 49 of file CsvDataWrapper.cpp.

References supported_options_.

49  {
50  return std::vector<std::string_view>{supported_options_.begin(),
51  supported_options_.end()};
52 }
static constexpr std::array< char const *, 11 > supported_options_
bool foreign_storage::CsvDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1199 of file CsvDataWrapper.cpp.

References is_restored_.

1199  {
1200  return is_restored_;
1201 }
void foreign_storage::CsvDataWrapper::populateChunkBuffers ( std::map< ChunkKey, AbstractBuffer * > &  required_buffers,
std::map< ChunkKey, AbstractBuffer * > &  optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 215 of file CsvDataWrapper.cpp.

References CHECK, Catalog_Namespace::SysCatalog::checkedGetCatalog(), CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::get_columns(), Catalog_Namespace::SysCatalog::instance(), populateChunkMapForColumns(), populateChunks(), TableDescriptor::tableId, and updateMetadata().

217  {
218  auto timer = DEBUG_TIMER(__func__);
220  CHECK(!required_buffers.empty());
221 
222  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
223  std::set<const ColumnDescriptor*> required_columns =
224  get_columns(required_buffers, catalog, foreign_table_->tableId, fragment_id);
225  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
227  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
228 
229  if (!optional_buffers.empty()) {
230  std::set<const ColumnDescriptor*> optional_columns;
231  optional_columns =
232  get_columns(optional_buffers, catalog, foreign_table_->tableId, fragment_id);
234  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
235  }
236  populateChunks(column_id_to_chunk_map, fragment_id);
237  updateMetadata(column_id_to_chunk_map, fragment_id);
238  for (auto& entry : column_id_to_chunk_map) {
239  entry.second.setBuffer(nullptr);
240  entry.second.setIndexBuffer(nullptr);
241  }
242 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static SysCatalog & instance()
Definition: SysCatalog.h:288
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const std::map< ChunkKey, AbstractBuffer * > &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
std::shared_ptr< Catalog > checkedGetCatalog(const int32_t db_id)
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::set< const ColumnDescriptor * > get_columns(const std::map< ChunkKey, AbstractBuffer * > &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const std::map< ChunkKey, AbstractBuffer * > &  buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 168 of file CsvDataWrapper.cpp.

References CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, foreign_table_, Data_Namespace::AbstractBuffer::reserve(), Chunk_NS::Chunk::setBuffer(), Data_Namespace::AbstractBuffer::size(), TableDescriptor::tableId, and UNREACHABLE.

Referenced by populateChunkBuffers().

172  {
173  for (const auto column : columns) {
174  ChunkKey data_chunk_key;
175  AbstractBuffer* data_buffer = nullptr;
176  AbstractBuffer* index_buffer = nullptr;
177  if (column->columnType.is_varlen_indeed()) {
178  data_chunk_key = {
179  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 1};
180  ChunkKey index_chunk_key = {
181  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 2};
182 
183  CHECK(buffers.find(data_chunk_key) != buffers.end());
184  CHECK(buffers.find(index_chunk_key) != buffers.end());
185 
186  data_buffer = buffers.find(data_chunk_key)->second;
187  index_buffer = buffers.find(index_chunk_key)->second;
188  CHECK_EQ(data_buffer->size(), static_cast<size_t>(0));
189  CHECK_EQ(index_buffer->size(), static_cast<size_t>(0));
190 
191  size_t index_offset_size{0};
192  if (column->columnType.is_string() || column->columnType.is_geometry()) {
193  index_offset_size = sizeof(StringOffsetT);
194  } else if (column->columnType.is_array()) {
195  index_offset_size = sizeof(ArrayOffsetT);
196  } else {
197  UNREACHABLE();
198  }
199  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
200  index_buffer->reserve(index_offset_size *
201  (chunk_metadata_map_[data_chunk_key]->numElements + 1));
202  } else {
203  data_chunk_key = {db_id_, foreign_table_->tableId, column->columnId, fragment_id};
204  CHECK(buffers.find(data_chunk_key) != buffers.end());
205  data_buffer = buffers.find(data_chunk_key)->second;
206  }
207  data_buffer->reserve(chunk_metadata_map_[data_chunk_key]->numBytes);
208  column_id_to_chunk_map[column->columnId] = Chunk_NS::Chunk{column};
209  column_id_to_chunk_map[column->columnId].setBuffer(data_buffer);
210  column_id_to_chunk_map[column->columnId].setIndexBuffer(index_buffer);
211  column_id_to_chunk_map[column->columnId].initEncoder();
212  }
213 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
#define UNREACHABLE()
Definition: Logger.h:241
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:108
int32_t StringOffsetT
Definition: sqltypes.h:919
An AbstractBuffer is a unit of data management for a data manager.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
int32_t ArrayOffsetT
Definition: sqltypes.h:920
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:197
virtual void reserve(size_t num_bytes)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for CSV file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with CSV content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 960 of file CsvDataWrapper.cpp.

References foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::add_placeholder_metadata(), append_start_offset_, foreign_storage::MetadataScanMultiThreadingParams::cached_chunks, Catalog_Namespace::SysCatalog::checkedGetCatalog(), foreign_storage::MetadataScanMultiThreadingParams::chunk_byte_count, chunk_byte_count_, foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, csv_reader_, db_id_, DEBUG_TIMER, foreign_storage::dispatch_metadata_scan_requests(), foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::get_cache_if_enabled(), foreign_storage::get_thread_count(), foreign_storage::ForeignTable::getFullFilePath(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, TableDescriptor::maxFragRows, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::skip_metadata_scan(), foreign_storage::ForeignServer::STORAGE_TYPE_KEY, TableDescriptor::tableId, UNREACHABLE, and validateAndGetCopyParams().

960  {
961  auto timer = DEBUG_TIMER(__func__);
962 
963  const auto copy_params = validateAndGetCopyParams();
964  const auto file_path = foreign_table_->getFullFilePath();
966  auto& server_options = foreign_table_->foreign_server->options;
967  if (foreign_table_->isAppendMode() && csv_reader_ != nullptr) {
968  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
970  csv_reader_->checkForMoreRows(append_start_offset_);
971  } else {
972  UNREACHABLE();
973  }
974  } else {
975  chunk_metadata_map_.clear();
977  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
979  csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
980  } else {
981  UNREACHABLE();
982  }
983  num_rows_ = 0;
985  }
986 
987  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
988  foreign_table_->tableId, false, false, true);
989  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
990  for (auto column : columns) {
991  column_by_id[column->columnId] = column;
992  }
993  MetadataScanMultiThreadingParams multi_threading_params;
994 
995  // Restore previous chunk data
996  if (foreign_table_->isAppendMode()) {
997  multi_threading_params.chunk_byte_count = chunk_byte_count_;
998  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
999  }
1000 
1001  std::set<int> columns_to_scan;
1002  for (auto column : columns) {
1003  if (!skip_metadata_scan(column)) {
1004  columns_to_scan.insert(column->columnId);
1005  }
1006  }
1007  // Track where scan started for appends
1008  int start_row = num_rows_;
1009  if (!csv_reader_->isScanFinished()) {
1010  auto buffer_size = get_buffer_size(copy_params,
1011  csv_reader_->isRemainingSizeKnown(),
1012  csv_reader_->getRemainingSize());
1013  auto thread_count = get_thread_count(copy_params,
1014  csv_reader_->isRemainingSizeKnown(),
1015  csv_reader_->getRemainingSize(),
1016  buffer_size);
1017  multi_threading_params.continue_processing = true;
1018 
1019  std::vector<std::future<void>> futures{};
1020  for (size_t i = 0; i < thread_count; i++) {
1021  multi_threading_params.request_pool.emplace(
1022  buffer_size, copy_params, db_id_, foreign_table_, columns_to_scan);
1023 
1024  futures.emplace_back(std::async(std::launch::async,
1025  scan_metadata,
1026  std::ref(multi_threading_params),
1028  }
1029 
1030  try {
1031  dispatch_metadata_scan_requests(buffer_size,
1032  file_path,
1033  (*csv_reader_),
1034  copy_params,
1035  multi_threading_params,
1036  num_rows_,
1038  } catch (...) {
1039  {
1040  std::unique_lock<std::mutex> pending_requests_lock(
1041  multi_threading_params.pending_requests_mutex);
1042  multi_threading_params.continue_processing = false;
1043  }
1044  multi_threading_params.pending_requests_condition.notify_all();
1045  throw;
1046  }
1047 
1048  for (auto& future : futures) {
1049  // get() instead of wait() because we need to propagate potential exceptions.
1050  future.get();
1051  }
1052  }
1053 
1054  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
1055  auto chunk_metadata =
1056  buffer->getEncoder()->getMetadata(column_by_id[chunk_key[2]]->columnType);
1057  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
1058  chunk_metadata->numBytes = multi_threading_params.chunk_byte_count[chunk_key];
1059  chunk_metadata_map_[chunk_key] = chunk_metadata;
1060  }
1061 
1062  for (auto column : columns) {
1063  if (skip_metadata_scan(column)) {
1065  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
1066  }
1067  }
1068 
1069  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1070  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1071  }
1072 
1073  // Save chunk data
1074  if (foreign_table_->isAppendMode()) {
1075  chunk_byte_count_ = multi_threading_params.chunk_byte_count;
1076  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
1077  }
1078 
1079  // Any incomplete chunks should be cached now
1080  auto cache = get_cache_if_enabled(catalog);
1081  if (cache) {
1082  std::vector<ChunkKey> to_cache;
1083  for (auto& [chunk_key, buffer] : multi_threading_params.cached_chunks) {
1084  if (buffer.getBuffer()->getEncoder()->getNumElems() !=
1085  static_cast<size_t>(foreign_table_->maxFragRows)) {
1086  if (column_by_id[chunk_key[CHUNK_KEY_COLUMN_IDX]]
1087  ->columnType.is_varlen_indeed()) {
1088  ChunkKey index_chunk_key = chunk_key;
1089  index_chunk_key[4] = 2;
1090  to_cache.push_back(chunk_key);
1091  to_cache.push_back(index_chunk_key);
1092  } else {
1093  to_cache.push_back(chunk_key);
1094  }
1095  }
1096  }
1097  if (to_cache.size() > 0) {
1098  cache->cacheTableChunks(to_cache);
1099  }
1100  }
1101 }
std::vector< int > ChunkKey
Definition: types.h:37
std::map< ChunkKey, size_t > chunk_byte_count_
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog)
#define UNREACHABLE()
Definition: Logger.h:241
std::unique_ptr< CsvReader > csv_reader_
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:45
static SysCatalog & instance()
Definition: SysCatalog.h:288
std::map< int, FileRegions > fragment_id_to_file_regions_map_
bool isAppendMode() const
Checks if the table is in append mode.
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::shared_ptr< Catalog > checkedGetCatalog(const int32_t db_id)
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:47
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
std::string getFullFilePath() const
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
const ForeignServer * foreign_server
Definition: ForeignTable.h:63
#define DEBUG_TIMER(name)
Definition: Logger.h:313
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
import_export::CopyParams validateAndGetCopyParams()
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks

Definition at line 383 of file CsvDataWrapper.cpp.

References CHECK, csv_reader_, db_id_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), foreign_storage::ForeignTable::getFullFilePath(), foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::parse_file_regions(), run_benchmark_import::result, foreign_storage::ForeignServer::STORAGE_TYPE_KEY, UNREACHABLE, and validateAndGetCopyParams().

Referenced by populateChunkBuffers().

385  {
386  const auto copy_params = validateAndGetCopyParams();
387 
388  CHECK(!column_id_to_chunk_map.empty());
389  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
390  CHECK(!file_regions.empty());
391 
392  const auto buffer_size = get_buffer_size(file_regions);
393  const auto thread_count = get_thread_count(copy_params, file_regions);
394 
395  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
396 
397  std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
398  parse_file_requests.reserve(thread_count);
399  std::vector<std::future<ParseFileRegionResult>> futures{};
400  std::set<int> column_filter_set;
401  for (const auto& pair : column_id_to_chunk_map) {
402  column_filter_set.insert(pair.first);
403  }
404 
405  std::vector<std::unique_ptr<CsvReader>> csv_readers;
406  rapidjson::Value reader_metadata(rapidjson::kObjectType);
407  rapidjson::Document d;
408  auto& server_options = foreign_table_->foreign_server->options;
409  csv_reader_->serialize(reader_metadata, d.GetAllocator());
410  const auto csv_file_path = foreign_table_->getFullFilePath();
411 
412  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
413  parse_file_requests.emplace_back(
414  buffer_size, copy_params, db_id_, foreign_table_, column_filter_set);
415  auto start_index = i;
416  auto end_index =
417  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
418 
419  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
421  csv_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
422  csv_file_path, copy_params, reader_metadata));
423  } else {
424  UNREACHABLE();
425  }
426 
427  futures.emplace_back(std::async(std::launch::async,
429  std::ref(file_regions),
430  start_index,
431  end_index,
432  std::ref(*(csv_readers.back())),
433  std::ref(parse_file_requests.back()),
434  std::ref(column_id_to_chunk_map)));
435  }
436 
437  std::vector<ParseFileRegionResult> load_file_region_results{};
438  for (auto& future : futures) {
439  future.wait();
440  load_file_region_results.emplace_back(future.get());
441  }
442 
443  for (auto result : load_file_region_results) {
444  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
445  chunk.appendData(
446  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
447  }
448  }
449 }
#define UNREACHABLE()
Definition: Logger.h:241
std::unique_ptr< CsvReader > csv_reader_
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:45
std::map< int, FileRegions > fragment_id_to_file_regions_map_
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:47
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
const ForeignTable * foreign_table_
std::string getFullFilePath() const
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
const ForeignServer * foreign_server
Definition: ForeignTable.h:63
#define CHECK(condition)
Definition: Logger.h:197
import_export::CopyParams validateAndGetCopyParams()

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1150 of file CsvDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_byte_count_, chunk_encoder_buffers_, chunk_metadata_map_, csv_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::json_utils::get_value_from_object(), foreign_storage::ForeignTable::getFullFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::json_utils::read_from_file(), foreign_storage::ForeignServer::STORAGE_TYPE_KEY, UNREACHABLE, and validateAndGetCopyParams().

1152  {
1153  auto d = json_utils::read_from_file(file_path);
1154  CHECK(d.IsObject());
1155 
1156  // Restore fragment map
1158  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1159 
1160  // Construct csv_reader with metadta
1161  CHECK(d.HasMember("reader_metadata"));
1162  const auto copy_params = validateAndGetCopyParams();
1163  const auto csv_file_path = foreign_table_->getFullFilePath();
1164  auto& server_options = foreign_table_->foreign_server->options;
1165  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
1167  csv_reader_ = std::make_unique<LocalMultiFileReader>(
1168  csv_file_path, copy_params, d["reader_metadata"]);
1169  } else {
1170  UNREACHABLE();
1171  }
1172 
1174  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1175 
1176  // Now restore the internal metadata maps
1177  CHECK(chunk_metadata_map_.empty());
1178  CHECK(chunk_encoder_buffers_.empty());
1179 
1180  for (auto& pair : chunk_metadata) {
1181  chunk_metadata_map_[pair.first] = pair.second;
1182 
1183  if (foreign_table_->isAppendMode()) {
1184  // Restore encoder state for append mode
1185  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1186  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1187  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1188  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1189  pair.second->numElements);
1190  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1191  pair.second->chunkStats);
1192  chunk_encoder_buffers_[pair.first]->setUpdated();
1193  chunk_byte_count_[pair.first] = pair.second->numBytes;
1194  }
1195  }
1196  is_restored_ = true;
1197 }
std::map< ChunkKey, size_t > chunk_byte_count_
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
#define UNREACHABLE()
Definition: Logger.h:241
std::unique_ptr< CsvReader > csv_reader_
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:45
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
rapidjson::Document read_from_file(const std::string &file_path)
std::map< int, FileRegions > fragment_id_to_file_regions_map_
bool isAppendMode() const
Checks if the table is in append mode.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:47
const ForeignTable * foreign_table_
std::string getFullFilePath() const
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
const ForeignServer * foreign_server
Definition: ForeignTable.h:63
#define CHECK(condition)
Definition: Logger.h:197
import_export::CopyParams validateAndGetCopyParams()

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::serializeDataWrapperInternals ( const std::string &  file_path) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Parameters
file_path- location to save file to

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1128 of file CsvDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), append_start_offset_, csv_reader_, fragment_id_to_file_regions_map_, num_rows_, and foreign_storage::json_utils::write_to_file().

1128  {
1129  rapidjson::Document d;
1130  d.SetObject();
1131 
1132  // Save fragment map
1135  "fragment_id_to_file_regions_map",
1136  d.GetAllocator());
1137 
1138  // Save csv_reader metadata
1139  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1140  csv_reader_->serialize(reader_metadata, d.GetAllocator());
1141  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1142 
1143  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1145  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1146 
1147  json_utils::write_to_file(d, file_path);
1148 }
std::unique_ptr< CsvReader > csv_reader_
std::map< int, FileRegions > fragment_id_to_file_regions_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::updateMetadata ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Definition at line 245 of file CsvDataWrapper.cpp.

References CHECK, Catalog_Namespace::SysCatalog::checkedGetCatalog(), chunk_metadata_map_, db_id_, foreign_table_, TableDescriptor::fragmenter, Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::skip_metadata_scan(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

247  {
248  auto fragmenter = foreign_table_->fragmenter;
249  if (fragmenter) {
251  for (auto& entry : column_id_to_chunk_map) {
252  const auto& column =
253  catalog->getMetadataForColumnUnlocked(foreign_table_->tableId, entry.first);
254  if (skip_metadata_scan(column)) {
255  ChunkKey data_chunk_key = {
256  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
257  if (column->columnType.is_varlen_indeed()) {
258  data_chunk_key.emplace_back(1);
259  }
260  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
261  auto cached_metadata = chunk_metadata_map_[data_chunk_key];
262  auto chunk_metadata =
263  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
264  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
265  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
266  cached_metadata->numBytes = entry.second.getBuffer()->size();
267  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
268  }
269  }
270  }
271 }
std::vector< int > ChunkKey
Definition: types.h:37
static SysCatalog & instance()
Definition: SysCatalog.h:288
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::shared_ptr< Catalog > checkedGetCatalog(const int32_t db_id)
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::optional< bool > foreign_storage::CsvDataWrapper::validateAndGetBoolValue ( const std::string &  option_name)
private

Validates that the string value of given table option is either "true" or "false" (case insensitive). An exception is thrown if option value does not match one of the two possible values.

Parameters
option_name- name of table option whose value is validated and returned
Returns
corresponding bool for option value. Returns an empty optional if table options do not contain provided option.

Definition at line 127 of file CsvDataWrapper.cpp.

References foreign_table_, and foreign_storage::OptionsContainer::options.

Referenced by validateAndGetCopyParams().

128  {
129  if (auto it = foreign_table_->options.find(option_name);
130  it != foreign_table_->options.end()) {
131  if (boost::iequals(it->second, "TRUE")) {
132  return true;
133  } else if (boost::iequals(it->second, "FALSE")) {
134  return false;
135  } else {
136  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
137  "\" foreign table option. "
138  "Value must be either 'true' or 'false'."};
139  }
140  }
141  return std::nullopt;
142 }
const ForeignTable * foreign_table_

+ Here is the caller graph for this function:

import_export::CopyParams foreign_storage::CsvDataWrapper::validateAndGetCopyParams ( )
private

Definition at line 63 of file CsvDataWrapper.cpp.

References foreign_table_, import_export::HAS_HEADER, import_export::NO_HEADER, foreign_storage::OptionsContainer::options, import_export::CopyParams::plain_text, validateAndGetBoolValue(), and validateAndGetStringWithLength().

Referenced by populateChunkMetadata(), populateChunks(), and restoreDataWrapperInternals().

63  {
64  import_export::CopyParams copy_params{};
65  copy_params.plain_text = true;
66  if (const auto& value = validateAndGetStringWithLength("ARRAY_DELIMITER", 1);
67  !value.empty()) {
68  copy_params.array_delim = value[0];
69  }
70  if (const auto& value = validateAndGetStringWithLength("ARRAY_MARKER", 2);
71  !value.empty()) {
72  copy_params.array_begin = value[0];
73  copy_params.array_end = value[1];
74  }
75  if (auto it = foreign_table_->options.find("BUFFER_SIZE");
76  it != foreign_table_->options.end()) {
77  copy_params.buffer_size = std::stoi(it->second);
78  }
79  if (const auto& value = validateAndGetStringWithLength("DELIMITER", 1);
80  !value.empty()) {
81  copy_params.delimiter = value[0];
82  }
83  if (const auto& value = validateAndGetStringWithLength("ESCAPE", 1); !value.empty()) {
84  copy_params.escape = value[0];
85  }
86  auto has_header = validateAndGetBoolValue("HEADER");
87  if (has_header.has_value()) {
88  if (has_header.value()) {
89  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
90  } else {
91  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
92  }
93  }
94  if (const auto& value = validateAndGetStringWithLength("LINE_DELIMITER", 1);
95  !value.empty()) {
96  copy_params.line_delim = value[0];
97  }
98  copy_params.lonlat = validateAndGetBoolValue("LONLAT").value_or(copy_params.lonlat);
99 
100  if (auto it = foreign_table_->options.find("NULLS");
101  it != foreign_table_->options.end()) {
102  copy_params.null_str = it->second;
103  }
104  if (const auto& value = validateAndGetStringWithLength("QUOTE", 1); !value.empty()) {
105  copy_params.quote = value[0];
106  }
107  copy_params.quoted = validateAndGetBoolValue("QUOTED").value_or(copy_params.quoted);
108  return copy_params;
109 }
std::string validateAndGetStringWithLength(const std::string &option_name, const size_t expected_num_chars)
std::optional< bool > validateAndGetBoolValue(const std::string &option_name)
const ForeignTable * foreign_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string foreign_storage::CsvDataWrapper::validateAndGetStringWithLength ( const std::string &  option_name,
const size_t  expected_num_chars 
)
private

Validates that the value of given table option has the expected number of characters. An exception is thrown if the number of characters do not match.

Parameters
option_name- name of table option whose value is validated and returned
expected_num_chars- expected number of characters for option value
Returns
value of the option if the number of characters match. Returns an empty string if table options do not contain provided option.

Definition at line 111 of file CsvDataWrapper.cpp.

References foreign_table_, foreign_storage::OptionsContainer::options, and to_string().

Referenced by validateAndGetCopyParams().

113  {
114  if (auto it = foreign_table_->options.find(option_name);
115  it != foreign_table_->options.end()) {
116  if (it->second.length() != expected_num_chars) {
117  throw std::runtime_error{"Value of \"" + option_name +
118  "\" foreign table option has the wrong number of "
119  "characters. Expected " +
120  std::to_string(expected_num_chars) + " character(s)."};
121  }
122  return it->second;
123  }
124  return "";
125 }
std::string to_string(char const *&&v)
const ForeignTable * foreign_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::validateFilePath ( )
private

Definition at line 54 of file CsvDataWrapper.cpp.

References foreign_storage::ForeignTable::foreign_server, foreign_table_, foreign_storage::ForeignTable::getFullFilePath(), ddl_utils::IMPORT, foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::ForeignServer::STORAGE_TYPE_KEY, and ddl_utils::validate_allowed_file_path().

54  {
55  auto& server_options = foreign_table_->foreign_server->options;
56  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
60  }
61 }
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:45
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:613
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:47
const ForeignTable * foreign_table_
std::string getFullFilePath() const
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
const ForeignServer * foreign_server
Definition: ForeignTable.h:63

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::validateOptions ( const ForeignTable foreign_table)
static

Definition at line 43 of file CsvDataWrapper.cpp.

43  {
44  CsvDataWrapper data_wrapper{foreign_table};
45  data_wrapper.validateAndGetCopyParams();
46  data_wrapper.validateFilePath();
47 }
CsvDataWrapper(const int db_id, const ForeignTable *foreign_table)

Member Data Documentation

size_t foreign_storage::CsvDataWrapper::append_start_offset_
private
std::map<ChunkKey, size_t> foreign_storage::CsvDataWrapper::chunk_byte_count_
private

Definition at line 145 of file CsvDataWrapper.h.

Referenced by populateChunkMetadata(), and restoreDataWrapperInternals().

std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::CsvDataWrapper::chunk_encoder_buffers_
private

Definition at line 144 of file CsvDataWrapper.h.

Referenced by populateChunkMetadata(), and restoreDataWrapperInternals().

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::CsvDataWrapper::chunk_metadata_map_
private
std::unique_ptr<CsvReader> foreign_storage::CsvDataWrapper::csv_reader_
private
const int foreign_storage::CsvDataWrapper::db_id_
private
std::mutex foreign_storage::CsvDataWrapper::file_access_mutex_
private

Definition at line 141 of file CsvDataWrapper.h.

std::map<int, FileRegions> foreign_storage::CsvDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::CsvDataWrapper::is_restored_
private

Definition at line 151 of file CsvDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

size_t foreign_storage::CsvDataWrapper::num_rows_
private
constexpr std::array<char const*, 11> foreign_storage::CsvDataWrapper::supported_options_
staticprivate
Initial value:
{"ARRAY_DELIMITER",
"ARRAY_MARKER",
"BUFFER_SIZE",
"DELIMITER",
"ESCAPE",
"HEADER",
"LINE_DELIMITER",
"LONLAT",
"NULLS",
"QUOTE",
"QUOTED"}

Definition at line 152 of file CsvDataWrapper.h.

Referenced by getSupportedOptions().


The documentation for this class was generated from the following files: