OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::CsvDataWrapper Class Reference

#include <CsvDataWrapper.h>

+ Inheritance diagram for foreign_storage::CsvDataWrapper:
+ Collaboration diagram for foreign_storage::CsvDataWrapper:

Public Member Functions

 CsvDataWrapper ()
 
 CsvDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void serializeDataWrapperInternals (const std::string &file_path) const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Private Member Functions

 CsvDataWrapper (const ForeignTable *foreign_table)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 
void updateMetadata (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
std::set< std::string_view > getAllCsvTableOptions () const
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< CsvReadercsv_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
std::map< ChunkKey, size_t > chunk_byte_count_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 

Static Private Attributes

static const std::set
< std::string_view > 
csv_table_options_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 

Detailed Description

Definition at line 33 of file CsvDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::CsvDataWrapper::CsvDataWrapper ( )

Definition at line 38 of file CsvDataWrapper.cpp.

38 : db_id_(-1), foreign_table_(nullptr) {}
const ForeignTable * foreign_table_
foreign_storage::CsvDataWrapper::CsvDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)

Definition at line 40 of file CsvDataWrapper.cpp.

41  : db_id_(db_id), foreign_table_(foreign_table), is_restored_(false) {}
const ForeignTable * foreign_table_
foreign_storage::CsvDataWrapper::CsvDataWrapper ( const ForeignTable foreign_table)
private

Member Function Documentation

std::set< std::string_view > foreign_storage::CsvDataWrapper::getAllCsvTableOptions ( ) const
private

Definition at line 52 of file CsvDataWrapper.cpp.

References csv_table_options_, and foreign_storage::AbstractFileStorageDataWrapper::getSupportedTableOptions().

Referenced by getSupportedTableOptions().

52  {
53  std::set<std::string_view> supported_table_options(
56  supported_table_options.insert(csv_table_options_.begin(), csv_table_options_.end());
57  return supported_table_options;
58 }
static const std::set< std::string_view > csv_table_options_
const std::set< std::string_view > & getSupportedTableOptions() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::CsvDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 54 of file CsvDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

ParallelismLevel foreign_storage::CsvDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 56 of file CsvDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

const std::set< std::string_view > & foreign_storage::CsvDataWrapper::getSupportedTableOptions ( ) const
overridevirtual

Gets the set of supported table options for the data wrapper.

Reimplemented from foreign_storage::AbstractFileStorageDataWrapper.

Definition at line 47 of file CsvDataWrapper.cpp.

References getAllCsvTableOptions().

47  {
48  static const auto supported_table_options = getAllCsvTableOptions();
49  return supported_table_options;
50 }
std::set< std::string_view > getAllCsvTableOptions() const

+ Here is the call graph for this function:

bool foreign_storage::CsvDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1055 of file CsvDataWrapper.cpp.

References is_restored_.

1055  {
1056  return is_restored_;
1057 }
void foreign_storage::CsvDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 97 of file CsvDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::get_columns(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), populateChunkMapForColumns(), populateChunks(), TableDescriptor::tableId, and updateMetadata().

98  {
99  auto timer = DEBUG_TIMER(__func__);
101  CHECK(catalog);
102  CHECK(!required_buffers.empty());
103 
104  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
105  std::set<const ColumnDescriptor*> required_columns =
106  get_columns(required_buffers, catalog, foreign_table_->tableId, fragment_id);
107  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
109  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
110 
111  if (!optional_buffers.empty()) {
112  std::set<const ColumnDescriptor*> optional_columns;
113  optional_columns =
114  get_columns(optional_buffers, catalog, foreign_table_->tableId, fragment_id);
116  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
117  }
118  populateChunks(column_id_to_chunk_map, fragment_id);
119  updateMetadata(column_id_to_chunk_map, fragment_id);
120  for (auto& entry : column_id_to_chunk_map) {
121  entry.second.setBuffer(nullptr);
122  entry.second.setIndexBuffer(nullptr);
123  }
124 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static SysCatalog & instance()
Definition: SysCatalog.h:292
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const ChunkToBufferMap buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 84 of file CsvDataWrapper.cpp.

References chunk_metadata_map_, db_id_, foreign_table_, foreign_storage::Csv::make_chunk_for_column(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

88  {
89  for (const auto column : columns) {
90  ChunkKey data_chunk_key = {
91  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
92  column_id_to_chunk_map[column->columnId] =
93  Csv::make_chunk_for_column(data_chunk_key, chunk_metadata_map_, buffers);
94  }
95 }
std::vector< int > ChunkKey
Definition: types.h:37
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
const ForeignTable * foreign_table_
Chunk_NS::Chunk make_chunk_for_column(const ChunkKey &chunk_key, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers)
Definition: CsvShared.cpp:183

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for CSV file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with CSV content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 838 of file CsvDataWrapper.cpp.

References foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::add_placeholder_metadata(), append_start_offset_, foreign_storage::MetadataScanMultiThreadingParams::cached_chunks, CHECK, foreign_storage::MetadataScanMultiThreadingParams::chunk_byte_count, chunk_byte_count_, foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, csv_reader_, db_id_, DEBUG_TIMER, foreign_storage::dispatch_metadata_scan_requests(), foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::get_cache_if_enabled(), foreign_storage::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), i, Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, TableDescriptor::maxFragRows, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::skip_metadata_scan(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, TableDescriptor::tableId, UNREACHABLE, and foreign_storage::Csv::validate_and_get_copy_params().

838  {
839  auto timer = DEBUG_TIMER(__func__);
840 
841  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
842  const auto file_path = getFullFilePath(foreign_table_);
844  CHECK(catalog);
845  auto& server_options = foreign_table_->foreign_server->options;
846  if (foreign_table_->isAppendMode() && csv_reader_ != nullptr) {
847  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
848  csv_reader_->checkForMoreRows(append_start_offset_);
849  } else {
850  UNREACHABLE();
851  }
852  } else {
853  chunk_metadata_map_.clear();
855  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
856  csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
857  } else {
858  UNREACHABLE();
859  }
860  num_rows_ = 0;
862  }
863 
864  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
865  foreign_table_->tableId, false, false, true);
866  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
867  for (auto column : columns) {
868  column_by_id[column->columnId] = column;
869  }
870  MetadataScanMultiThreadingParams multi_threading_params;
871 
872  // Restore previous chunk data
873  if (foreign_table_->isAppendMode()) {
874  multi_threading_params.chunk_byte_count = chunk_byte_count_;
875  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
876  }
877 
878  std::set<int> columns_to_scan;
879  for (auto column : columns) {
880  if (!skip_metadata_scan(column)) {
881  columns_to_scan.insert(column->columnId);
882  }
883  }
884 
885  // Track where scan started for appends
886  int start_row = num_rows_;
887  if (!csv_reader_->isScanFinished()) {
888  auto buffer_size = get_buffer_size(copy_params,
889  csv_reader_->isRemainingSizeKnown(),
890  csv_reader_->getRemainingSize());
891  auto thread_count = get_thread_count(copy_params,
892  csv_reader_->isRemainingSizeKnown(),
893  csv_reader_->getRemainingSize(),
894  buffer_size);
895  multi_threading_params.continue_processing = true;
896 
897  std::vector<std::future<void>> futures{};
898  for (size_t i = 0; i < thread_count; i++) {
899  multi_threading_params.request_pool.emplace(buffer_size,
900  copy_params,
901  db_id_,
903  columns_to_scan,
905 
906  futures.emplace_back(std::async(std::launch::async,
908  std::ref(multi_threading_params),
910  }
911 
912  try {
914  file_path,
915  (*csv_reader_),
916  copy_params,
917  multi_threading_params,
918  num_rows_,
920  } catch (...) {
921  {
922  std::unique_lock<std::mutex> pending_requests_lock(
923  multi_threading_params.pending_requests_mutex);
924  multi_threading_params.continue_processing = false;
925  }
926  multi_threading_params.pending_requests_condition.notify_all();
927  throw;
928  }
929 
930  for (auto& future : futures) {
931  // get() instead of wait() because we need to propagate potential exceptions.
932  future.get();
933  }
934  }
935 
936  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
937  auto chunk_metadata =
938  buffer->getEncoder()->getMetadata(column_by_id[chunk_key[2]]->columnType);
939  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
940  chunk_metadata->numBytes = multi_threading_params.chunk_byte_count[chunk_key];
941  chunk_metadata_map_[chunk_key] = chunk_metadata;
942  }
943 
944  for (auto column : columns) {
945  if (skip_metadata_scan(column)) {
947  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
948  }
949  }
950 
951  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
952  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
953  }
954 
955  // Save chunk data
956  if (foreign_table_->isAppendMode()) {
957  chunk_byte_count_ = multi_threading_params.chunk_byte_count;
958  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
959  }
960 
961  // Any incomplete chunks should be cached now
962  auto cache = get_cache_if_enabled(catalog);
963  if (cache) {
964  std::vector<ChunkKey> to_cache;
965  for (auto& [chunk_key, buffer] : multi_threading_params.cached_chunks) {
966  if (buffer.getBuffer()->getEncoder()->getNumElems() !=
967  static_cast<size_t>(foreign_table_->maxFragRows)) {
968  if (column_by_id[chunk_key[CHUNK_KEY_COLUMN_IDX]]
969  ->columnType.is_varlen_indeed()) {
970  ChunkKey index_chunk_key = chunk_key;
971  index_chunk_key[4] = 2;
972  to_cache.push_back(chunk_key);
973  to_cache.push_back(index_chunk_key);
974  } else {
975  to_cache.push_back(chunk_key);
976  }
977  }
978  }
979  if (to_cache.size() > 0) {
980  cache->cacheTableChunks(to_cache);
981  }
982  }
983 }
std::vector< int > ChunkKey
Definition: types.h:37
std::map< ChunkKey, size_t > chunk_byte_count_
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:126
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog)
#define UNREACHABLE()
Definition: Logger.h:241
std::unique_ptr< CsvReader > csv_reader_
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
static SysCatalog & instance()
Definition: SysCatalog.h:292
std::map< int, FileRegions > fragment_id_to_file_regions_map_
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks

Definition at line 267 of file CsvDataWrapper.cpp.

References CHECK, csv_reader_, test_fsi::d, db_id_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), i, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::parse_file_regions(), run_benchmark_import::result, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::Csv::validate_and_get_copy_params().

Referenced by populateChunkBuffers().

269  {
270  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
271 
272  CHECK(!column_id_to_chunk_map.empty());
273  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
274  CHECK(!file_regions.empty());
275 
276  const auto buffer_size = get_buffer_size(file_regions);
277  const auto thread_count = get_thread_count(copy_params, file_regions);
278 
279  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
280 
281  std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
282  parse_file_requests.reserve(thread_count);
283  std::vector<std::future<ParseFileRegionResult>> futures{};
284  std::set<int> column_filter_set;
285  for (const auto& pair : column_id_to_chunk_map) {
286  column_filter_set.insert(pair.first);
287  }
288 
289  std::vector<std::unique_ptr<CsvReader>> csv_readers;
290  rapidjson::Value reader_metadata(rapidjson::kObjectType);
291  rapidjson::Document d;
292  auto& server_options = foreign_table_->foreign_server->options;
293  csv_reader_->serialize(reader_metadata, d.GetAllocator());
294  const auto csv_file_path = getFullFilePath(foreign_table_);
295 
296  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
297  parse_file_requests.emplace_back(buffer_size,
298  copy_params,
299  db_id_,
301  column_filter_set,
302  csv_file_path);
303  auto start_index = i;
304  auto end_index =
305  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
306 
307  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
308  csv_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
309  csv_file_path, copy_params, reader_metadata));
310  } else {
311  UNREACHABLE();
312  }
313 
314  futures.emplace_back(std::async(std::launch::async,
316  std::ref(file_regions),
317  start_index,
318  end_index,
319  std::ref(*(csv_readers.back())),
320  std::ref(parse_file_requests.back()),
321  std::ref(column_id_to_chunk_map)));
322  }
323 
324  std::vector<ParseFileRegionResult> load_file_region_results{};
325  for (auto& future : futures) {
326  future.wait();
327  load_file_region_results.emplace_back(future.get());
328  }
329 
330  for (auto result : load_file_region_results) {
331  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
332  chunk.appendData(
333  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
334  }
335  }
336 }
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:126
tuple d
Definition: test_fsi.py:9
#define UNREACHABLE()
Definition: Logger.h:241
std::unique_ptr< CsvReader > csv_reader_
std::map< int, FileRegions > fragment_id_to_file_regions_map_
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
#define CHECK(condition)
Definition: Logger.h:197
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1007 of file CsvDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_byte_count_, chunk_encoder_buffers_, chunk_metadata_map_, csv_reader_, test_fsi::d, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::json_utils::get_value_from_object(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::json_utils::read_from_file(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::Csv::validate_and_get_copy_params().

1009  {
1010  auto d = json_utils::read_from_file(file_path);
1011  CHECK(d.IsObject());
1012 
1013  // Restore fragment map
1015  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1016 
1017  // Construct csv_reader with metadta
1018  CHECK(d.HasMember("reader_metadata"));
1019  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
1020  const auto csv_file_path = getFullFilePath(foreign_table_);
1021  auto& server_options = foreign_table_->foreign_server->options;
1022  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1023  csv_reader_ = std::make_unique<LocalMultiFileReader>(
1024  csv_file_path, copy_params, d["reader_metadata"]);
1025  } else {
1026  UNREACHABLE();
1027  }
1028 
1030  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1031 
1032  // Now restore the internal metadata maps
1033  CHECK(chunk_metadata_map_.empty());
1034  CHECK(chunk_encoder_buffers_.empty());
1035 
1036  for (auto& pair : chunk_metadata) {
1037  chunk_metadata_map_[pair.first] = pair.second;
1038 
1039  if (foreign_table_->isAppendMode()) {
1040  // Restore encoder state for append mode
1041  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1042  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1043  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1044  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1045  pair.second->numElements);
1046  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1047  pair.second->chunkStats);
1048  chunk_encoder_buffers_[pair.first]->setUpdated();
1049  chunk_byte_count_[pair.first] = pair.second->numBytes;
1050  }
1051  }
1052  is_restored_ = true;
1053 }
std::map< ChunkKey, size_t > chunk_byte_count_
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:126
tuple d
Definition: test_fsi.py:9
#define UNREACHABLE()
Definition: Logger.h:241
std::unique_ptr< CsvReader > csv_reader_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
rapidjson::Document read_from_file(const std::string &file_path)
std::map< int, FileRegions > fragment_id_to_file_regions_map_
bool isAppendMode() const
Checks if the table is in append mode.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
#define CHECK(condition)
Definition: Logger.h:197
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::serializeDataWrapperInternals ( const std::string &  file_path) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Parameters
file_path- location to save file to

Implements foreign_storage::ForeignDataWrapper.

Definition at line 985 of file CsvDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), append_start_offset_, csv_reader_, test_fsi::d, fragment_id_to_file_regions_map_, num_rows_, and foreign_storage::json_utils::write_to_file().

985  {
986  rapidjson::Document d;
987  d.SetObject();
988 
989  // Save fragment map
992  "fragment_id_to_file_regions_map",
993  d.GetAllocator());
994 
995  // Save csv_reader metadata
996  rapidjson::Value reader_metadata(rapidjson::kObjectType);
997  csv_reader_->serialize(reader_metadata, d.GetAllocator());
998  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
999 
1000  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1002  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1003 
1004  json_utils::write_to_file(d, file_path);
1005 }
tuple d
Definition: test_fsi.py:9
std::unique_ptr< CsvReader > csv_reader_
std::map< int, FileRegions > fragment_id_to_file_regions_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::updateMetadata ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Definition at line 127 of file CsvDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, TableDescriptor::fragmenter, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::skip_metadata_scan(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

129  {
130  auto fragmenter = foreign_table_->fragmenter;
131  if (fragmenter) {
133  CHECK(catalog);
134  for (auto& entry : column_id_to_chunk_map) {
135  const auto& column =
136  catalog->getMetadataForColumnUnlocked(foreign_table_->tableId, entry.first);
137  if (skip_metadata_scan(column)) {
138  ChunkKey data_chunk_key = {
139  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
140  if (column->columnType.is_varlen_indeed()) {
141  data_chunk_key.emplace_back(1);
142  }
143  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
144  auto cached_metadata = chunk_metadata_map_[data_chunk_key];
145  auto chunk_metadata =
146  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
147  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
148  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
149  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
150  cached_metadata->numBytes = entry.second.getBuffer()->size();
151  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
152  }
153  }
154  }
155 }
std::vector< int > ChunkKey
Definition: types.h:37
static SysCatalog & instance()
Definition: SysCatalog.h:292
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::validateTableOptions ( const ForeignTable foreign_table) const
overridevirtual

Checks that the options for the given foreign table object are valid.

Parameters
foreign_table- foreign table object containing options to be validated

Reimplemented from foreign_storage::AbstractFileStorageDataWrapper.

Definition at line 43 of file CsvDataWrapper.cpp.

References foreign_storage::Csv::validate_options(), and foreign_storage::AbstractFileStorageDataWrapper::validateTableOptions().

43  {
45  Csv::validate_options(foreign_table);
46 }
void validateTableOptions(const ForeignTable *foreign_table) const override
void validate_options(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:121

+ Here is the call graph for this function:

Member Data Documentation

size_t foreign_storage::CsvDataWrapper::append_start_offset_
private
std::map<ChunkKey, size_t> foreign_storage::CsvDataWrapper::chunk_byte_count_
private

Definition at line 93 of file CsvDataWrapper.h.

Referenced by populateChunkMetadata(), and restoreDataWrapperInternals().

std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::CsvDataWrapper::chunk_encoder_buffers_
private

Definition at line 92 of file CsvDataWrapper.h.

Referenced by populateChunkMetadata(), and restoreDataWrapperInternals().

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::CsvDataWrapper::chunk_metadata_map_
private
std::unique_ptr<CsvReader> foreign_storage::CsvDataWrapper::csv_reader_
private
const std::set< std::string_view > foreign_storage::CsvDataWrapper::csv_table_options_
staticprivate
Initial value:
{"ARRAY_DELIMITER",
"ARRAY_MARKER",
"BUFFER_SIZE",
"DELIMITER",
"ESCAPE",
"HEADER",
"LINE_DELIMITER",
"LONLAT",
"NULLS",
"QUOTE",
"QUOTED",
"S3_ACCESS_TYPE"}

Definition at line 100 of file CsvDataWrapper.h.

Referenced by getAllCsvTableOptions().

const int foreign_storage::CsvDataWrapper::db_id_
private
const ForeignTable* foreign_storage::CsvDataWrapper::foreign_table_
private
std::map<int, FileRegions> foreign_storage::CsvDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::CsvDataWrapper::is_restored_
private

Definition at line 99 of file CsvDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

size_t foreign_storage::CsvDataWrapper::num_rows_
private

The documentation for this class was generated from the following files: