OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::CsvDataWrapper Class Reference

#include <CsvDataWrapper.h>

+ Inheritance diagram for foreign_storage::CsvDataWrapper:
+ Collaboration diagram for foreign_storage::CsvDataWrapper:

Public Member Functions

 CsvDataWrapper ()
 
 CsvDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void serializeDataWrapperInternals (const std::string &file_path) const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Private Member Functions

 CsvDataWrapper (const ForeignTable *foreign_table)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 
void updateMetadata (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
std::set< std::string_view > getAllCsvTableOptions () const
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< CsvReadercsv_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 

Static Private Attributes

static const std::set
< std::string_view > 
csv_table_options_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 

Detailed Description

Definition at line 33 of file CsvDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::CsvDataWrapper::CsvDataWrapper ( )

Definition at line 38 of file CsvDataWrapper.cpp.

38 : db_id_(-1), foreign_table_(nullptr) {}
const ForeignTable * foreign_table_
foreign_storage::CsvDataWrapper::CsvDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)

Definition at line 40 of file CsvDataWrapper.cpp.

41  : db_id_(db_id), foreign_table_(foreign_table), is_restored_(false) {}
const ForeignTable * foreign_table_
foreign_storage::CsvDataWrapper::CsvDataWrapper ( const ForeignTable foreign_table)
private

Member Function Documentation

std::set< std::string_view > foreign_storage::CsvDataWrapper::getAllCsvTableOptions ( ) const
private

Definition at line 52 of file CsvDataWrapper.cpp.

References csv_table_options_, and foreign_storage::AbstractFileStorageDataWrapper::getSupportedTableOptions().

Referenced by getSupportedTableOptions().

52  {
53  std::set<std::string_view> supported_table_options(
56  supported_table_options.insert(csv_table_options_.begin(), csv_table_options_.end());
57  return supported_table_options;
58 }
static const std::set< std::string_view > csv_table_options_
const std::set< std::string_view > & getSupportedTableOptions() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::CsvDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 54 of file CsvDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

ParallelismLevel foreign_storage::CsvDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 56 of file CsvDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

const std::set< std::string_view > & foreign_storage::CsvDataWrapper::getSupportedTableOptions ( ) const
overridevirtual

Gets the set of supported table options for the data wrapper.

Reimplemented from foreign_storage::AbstractFileStorageDataWrapper.

Definition at line 47 of file CsvDataWrapper.cpp.

References getAllCsvTableOptions().

47  {
48  static const auto supported_table_options = getAllCsvTableOptions();
49  return supported_table_options;
50 }
std::set< std::string_view > getAllCsvTableOptions() const

+ Here is the call graph for this function:

bool foreign_storage::CsvDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1031 of file CsvDataWrapper.cpp.

References is_restored_.

1031  {
1032  return is_restored_;
1033 }
void foreign_storage::CsvDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 97 of file CsvDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::get_columns(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), populateChunkMapForColumns(), populateChunks(), TableDescriptor::tableId, and updateMetadata().

98  {
99  auto timer = DEBUG_TIMER(__func__);
101  CHECK(catalog);
102  CHECK(!required_buffers.empty());
103 
104  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
105  std::set<const ColumnDescriptor*> required_columns =
106  get_columns(required_buffers, catalog, foreign_table_->tableId, fragment_id);
107  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
109  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
110 
111  if (!optional_buffers.empty()) {
112  std::set<const ColumnDescriptor*> optional_columns;
113  optional_columns =
114  get_columns(optional_buffers, catalog, foreign_table_->tableId, fragment_id);
116  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
117  }
118  populateChunks(column_id_to_chunk_map, fragment_id);
119  updateMetadata(column_id_to_chunk_map, fragment_id);
120  for (auto& entry : column_id_to_chunk_map) {
121  entry.second.setBuffer(nullptr);
122  entry.second.setIndexBuffer(nullptr);
123  }
124 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static SysCatalog & instance()
Definition: SysCatalog.h:292
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const ChunkToBufferMap buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 84 of file CsvDataWrapper.cpp.

References chunk_metadata_map_, db_id_, foreign_table_, foreign_storage::Csv::make_chunk_for_column(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

88  {
89  for (const auto column : columns) {
90  ChunkKey data_chunk_key = {
91  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
92  column_id_to_chunk_map[column->columnId] =
93  Csv::make_chunk_for_column(data_chunk_key, chunk_metadata_map_, buffers);
94  }
95 }
std::vector< int > ChunkKey
Definition: types.h:37
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
const ForeignTable * foreign_table_
Chunk_NS::Chunk make_chunk_for_column(const ChunkKey &chunk_key, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers)
Definition: CsvShared.cpp:183

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for CSV file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with CSV content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 804 of file CsvDataWrapper.cpp.

References foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::add_placeholder_metadata(), append_start_offset_, foreign_storage::MetadataScanMultiThreadingParams::cached_chunks, CHECK, CHECK_EQ, foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, csv_reader_, db_id_, DEBUG_TIMER, foreign_storage::dispatch_metadata_scan_requests(), foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::get_cache_if_enabled(), foreign_storage::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), i, Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, TableDescriptor::maxFragRows, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::skip_metadata_scan(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, TableDescriptor::tableId, UNREACHABLE, and foreign_storage::Csv::validate_and_get_copy_params().

804  {
805  auto timer = DEBUG_TIMER(__func__);
806 
807  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
808  const auto file_path = getFullFilePath(foreign_table_);
810  CHECK(catalog);
811  auto& server_options = foreign_table_->foreign_server->options;
812  if (foreign_table_->isAppendMode() && csv_reader_ != nullptr) {
813  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
814  csv_reader_->checkForMoreRows(append_start_offset_);
815  } else {
816  UNREACHABLE();
817  }
818  } else {
819  // Should only be called once for non-append tables
820  CHECK(chunk_metadata_map_.empty());
822  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
823  csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
824  } else {
825  UNREACHABLE();
826  }
827  num_rows_ = 0;
829  }
830 
831  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
832  foreign_table_->tableId, false, false, true);
833  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
834  for (auto column : columns) {
835  column_by_id[column->columnId] = column;
836  }
837  MetadataScanMultiThreadingParams multi_threading_params;
838 
839  // Restore previous chunk data
840  if (foreign_table_->isAppendMode()) {
841  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
842  }
843 
844  std::set<int> columns_to_scan;
845  for (auto column : columns) {
846  if (!skip_metadata_scan(column)) {
847  columns_to_scan.insert(column->columnId);
848  }
849  }
850 
851  // Track where scan started for appends
852  int start_row = num_rows_;
853  if (!csv_reader_->isScanFinished()) {
854  auto buffer_size = get_buffer_size(copy_params,
855  csv_reader_->isRemainingSizeKnown(),
856  csv_reader_->getRemainingSize());
857  auto thread_count = get_thread_count(copy_params,
858  csv_reader_->isRemainingSizeKnown(),
859  csv_reader_->getRemainingSize(),
860  buffer_size);
861  multi_threading_params.continue_processing = true;
862 
863  std::vector<std::future<void>> futures{};
864  for (size_t i = 0; i < thread_count; i++) {
865  multi_threading_params.request_pool.emplace(buffer_size,
866  copy_params,
867  db_id_,
869  columns_to_scan,
871 
872  futures.emplace_back(std::async(std::launch::async,
874  std::ref(multi_threading_params),
876  }
877 
878  try {
880  file_path,
881  (*csv_reader_),
882  copy_params,
883  multi_threading_params,
884  num_rows_,
886  } catch (...) {
887  {
888  std::unique_lock<std::mutex> pending_requests_lock(
889  multi_threading_params.pending_requests_mutex);
890  multi_threading_params.continue_processing = false;
891  }
892  multi_threading_params.pending_requests_condition.notify_all();
893  throw;
894  }
895 
896  for (auto& future : futures) {
897  // get() instead of wait() because we need to propagate potential exceptions.
898  future.get();
899  }
900  }
901 
902  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
903  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
904  CHECK(column_entry != column_by_id.end());
905  const auto& column_type = column_entry->second->columnType;
906  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
907  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
908  const auto& cached_chunks = multi_threading_params.cached_chunks;
909  if (!column_type.is_varlen_indeed()) {
910  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
911  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
912  chunk_entry != cached_chunks.end()) {
913  auto buffer = chunk_entry->second.getBuffer();
914  CHECK(buffer);
915  chunk_metadata->numBytes = buffer->size();
916  } else {
917  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
918  }
919  chunk_metadata_map_[chunk_key] = chunk_metadata;
920  }
921 
922  for (auto column : columns) {
923  if (skip_metadata_scan(column)) {
925  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
926  }
927  }
928 
929  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
930  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
931  }
932 
933  // Save chunk data
934  if (foreign_table_->isAppendMode()) {
935  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
936  }
937 
938  // Any incomplete chunks should be cached now
939  auto cache = get_cache_if_enabled(catalog);
940  if (cache) {
941  std::vector<ChunkKey> to_cache;
942  for (auto& [chunk_key, buffer] : multi_threading_params.cached_chunks) {
943  if (buffer.getBuffer()->getEncoder()->getNumElems() !=
944  static_cast<size_t>(foreign_table_->maxFragRows)) {
945  if (column_by_id[chunk_key[CHUNK_KEY_COLUMN_IDX]]
946  ->columnType.is_varlen_indeed()) {
947  ChunkKey index_chunk_key = chunk_key;
948  index_chunk_key[4] = 2;
949  to_cache.push_back(chunk_key);
950  to_cache.push_back(index_chunk_key);
951  } else {
952  to_cache.push_back(chunk_key);
953  }
954  }
955  }
956  if (to_cache.size() > 0) {
957  cache->cacheTableChunks(to_cache);
958  }
959  }
960 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:126
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog)
#define UNREACHABLE()
Definition: Logger.h:247
std::unique_ptr< CsvReader > csv_reader_
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
static SysCatalog & instance()
Definition: SysCatalog.h:292
std::map< int, FileRegions > fragment_id_to_file_regions_map_
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks

Definition at line 267 of file CsvDataWrapper.cpp.

References CHECK, csv_reader_, test_fsi::d, db_id_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), i, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::parse_file_regions(), run_benchmark_import::result, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::Csv::validate_and_get_copy_params().

Referenced by populateChunkBuffers().

269  {
270  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
271 
272  CHECK(!column_id_to_chunk_map.empty());
273  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
274  CHECK(!file_regions.empty());
275 
276  const auto buffer_size = get_buffer_size(file_regions);
277  const auto thread_count = get_thread_count(copy_params, file_regions);
278 
279  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
280 
281  std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
282  parse_file_requests.reserve(thread_count);
283  std::vector<std::future<ParseFileRegionResult>> futures{};
284  std::set<int> column_filter_set;
285  for (const auto& pair : column_id_to_chunk_map) {
286  column_filter_set.insert(pair.first);
287  }
288 
289  std::vector<std::unique_ptr<CsvReader>> csv_readers;
290  rapidjson::Value reader_metadata(rapidjson::kObjectType);
291  rapidjson::Document d;
292  auto& server_options = foreign_table_->foreign_server->options;
293  csv_reader_->serialize(reader_metadata, d.GetAllocator());
294  const auto csv_file_path = getFullFilePath(foreign_table_);
295 
296  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
297  parse_file_requests.emplace_back(buffer_size,
298  copy_params,
299  db_id_,
301  column_filter_set,
302  csv_file_path);
303  auto start_index = i;
304  auto end_index =
305  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
306 
307  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
308  csv_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
309  csv_file_path, copy_params, reader_metadata));
310  } else {
311  UNREACHABLE();
312  }
313 
314  futures.emplace_back(std::async(std::launch::async,
316  std::ref(file_regions),
317  start_index,
318  end_index,
319  std::ref(*(csv_readers.back())),
320  std::ref(parse_file_requests.back()),
321  std::ref(column_id_to_chunk_map)));
322  }
323 
324  std::vector<ParseFileRegionResult> load_file_region_results{};
325  for (auto& future : futures) {
326  future.wait();
327  load_file_region_results.emplace_back(future.get());
328  }
329 
330  for (auto result : load_file_region_results) {
331  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
332  chunk.appendData(
333  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
334  }
335  }
336 }
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:126
tuple d
Definition: test_fsi.py:9
#define UNREACHABLE()
Definition: Logger.h:247
std::unique_ptr< CsvReader > csv_reader_
std::map< int, FileRegions > fragment_id_to_file_regions_map_
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
#define CHECK(condition)
Definition: Logger.h:203
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 984 of file CsvDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_encoder_buffers_, chunk_metadata_map_, csv_reader_, test_fsi::d, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::json_utils::get_value_from_object(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::json_utils::read_from_file(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::Csv::validate_and_get_copy_params().

986  {
987  auto d = json_utils::read_from_file(file_path);
988  CHECK(d.IsObject());
989 
990  // Restore fragment map
992  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
993 
994  // Construct csv_reader with metadta
995  CHECK(d.HasMember("reader_metadata"));
996  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
997  const auto csv_file_path = getFullFilePath(foreign_table_);
998  auto& server_options = foreign_table_->foreign_server->options;
999  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1000  csv_reader_ = std::make_unique<LocalMultiFileReader>(
1001  csv_file_path, copy_params, d["reader_metadata"]);
1002  } else {
1003  UNREACHABLE();
1004  }
1005 
1007  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1008 
1009  // Now restore the internal metadata maps
1010  CHECK(chunk_metadata_map_.empty());
1011  CHECK(chunk_encoder_buffers_.empty());
1012 
1013  for (auto& pair : chunk_metadata) {
1014  chunk_metadata_map_[pair.first] = pair.second;
1015 
1016  if (foreign_table_->isAppendMode()) {
1017  // Restore encoder state for append mode
1018  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1019  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1020  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1021  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1022  pair.second->numElements);
1023  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1024  pair.second->chunkStats);
1025  chunk_encoder_buffers_[pair.first]->setUpdated();
1026  }
1027  }
1028  is_restored_ = true;
1029 }
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:126
tuple d
Definition: test_fsi.py:9
#define UNREACHABLE()
Definition: Logger.h:247
std::unique_ptr< CsvReader > csv_reader_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
rapidjson::Document read_from_file(const std::string &file_path)
std::map< int, FileRegions > fragment_id_to_file_regions_map_
bool isAppendMode() const
Checks if the table is in append mode.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
#define CHECK(condition)
Definition: Logger.h:203
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::serializeDataWrapperInternals ( const std::string &  file_path) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Parameters
file_path- location to save file to

Implements foreign_storage::ForeignDataWrapper.

Definition at line 962 of file CsvDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), append_start_offset_, csv_reader_, test_fsi::d, fragment_id_to_file_regions_map_, num_rows_, and foreign_storage::json_utils::write_to_file().

962  {
963  rapidjson::Document d;
964  d.SetObject();
965 
966  // Save fragment map
969  "fragment_id_to_file_regions_map",
970  d.GetAllocator());
971 
972  // Save csv_reader metadata
973  rapidjson::Value reader_metadata(rapidjson::kObjectType);
974  csv_reader_->serialize(reader_metadata, d.GetAllocator());
975  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
976 
977  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
979  d, append_start_offset_, "append_start_offset", d.GetAllocator());
980 
981  json_utils::write_to_file(d, file_path);
982 }
tuple d
Definition: test_fsi.py:9
std::unique_ptr< CsvReader > csv_reader_
std::map< int, FileRegions > fragment_id_to_file_regions_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)

+ Here is the call graph for this function:

void foreign_storage::CsvDataWrapper::updateMetadata ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Definition at line 127 of file CsvDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, TableDescriptor::fragmenter, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{CsvDataWrapper.cpp}::skip_metadata_scan(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

129  {
130  auto fragmenter = foreign_table_->fragmenter;
131  if (fragmenter) {
133  CHECK(catalog);
134  for (auto& entry : column_id_to_chunk_map) {
135  const auto& column =
136  catalog->getMetadataForColumnUnlocked(foreign_table_->tableId, entry.first);
137  if (skip_metadata_scan(column)) {
138  ChunkKey data_chunk_key = {
139  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
140  if (column->columnType.is_varlen_indeed()) {
141  data_chunk_key.emplace_back(1);
142  }
143  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
144  auto cached_metadata = chunk_metadata_map_[data_chunk_key];
145  auto chunk_metadata =
146  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
147  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
148  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
149  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
150  cached_metadata->numBytes = entry.second.getBuffer()->size();
151  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
152  }
153  }
154  }
155 }
std::vector< int > ChunkKey
Definition: types.h:37
static SysCatalog & instance()
Definition: SysCatalog.h:292
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::CsvDataWrapper::validateTableOptions ( const ForeignTable foreign_table) const
overridevirtual

Checks that the options for the given foreign table object are valid.

Parameters
foreign_table- foreign table object containing options to be validated

Reimplemented from foreign_storage::AbstractFileStorageDataWrapper.

Definition at line 43 of file CsvDataWrapper.cpp.

References foreign_storage::Csv::validate_options(), and foreign_storage::AbstractFileStorageDataWrapper::validateTableOptions().

43  {
45  Csv::validate_options(foreign_table);
46 }
void validateTableOptions(const ForeignTable *foreign_table) const override
void validate_options(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:121

+ Here is the call graph for this function:

Member Data Documentation

size_t foreign_storage::CsvDataWrapper::append_start_offset_
private
std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::CsvDataWrapper::chunk_encoder_buffers_
private

Definition at line 92 of file CsvDataWrapper.h.

Referenced by populateChunkMetadata(), and restoreDataWrapperInternals().

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::CsvDataWrapper::chunk_metadata_map_
private
std::unique_ptr<CsvReader> foreign_storage::CsvDataWrapper::csv_reader_
private
const std::set< std::string_view > foreign_storage::CsvDataWrapper::csv_table_options_
staticprivate
Initial value:
{"ARRAY_DELIMITER",
"ARRAY_MARKER",
"BUFFER_SIZE",
"DELIMITER",
"ESCAPE",
"HEADER",
"LINE_DELIMITER",
"LONLAT",
"NULLS",
"QUOTE",
"QUOTED",
"S3_ACCESS_TYPE"}

Definition at line 99 of file CsvDataWrapper.h.

Referenced by getAllCsvTableOptions().

const int foreign_storage::CsvDataWrapper::db_id_
private
const ForeignTable* foreign_storage::CsvDataWrapper::foreign_table_
private
std::map<int, FileRegions> foreign_storage::CsvDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::CsvDataWrapper::is_restored_
private

Definition at line 98 of file CsvDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

size_t foreign_storage::CsvDataWrapper::num_rows_
private

The documentation for this class was generated from the following files: