OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::AbstractTextFileDataWrapper Class Referenceabstract

#include <AbstractTextFileDataWrapper.h>

+ Inheritance diagram for foreign_storage::AbstractTextFileDataWrapper:
+ Collaboration diagram for foreign_storage::AbstractTextFileDataWrapper:

Public Member Functions

 AbstractTextFileDataWrapper ()
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Protected Member Functions

virtual const
TextFileBufferParser
getFileBufferParser () const =0
 

Private Member Functions

 AbstractTextFileDataWrapper (const ForeignTable *foreign_table)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 
void updateMetadata (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< FileReaderfile_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 

Detailed Description

Definition at line 32 of file AbstractTextFileDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( )
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const ForeignTable foreign_table)
private

Member Function Documentation

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 49 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

virtual const TextFileBufferParser& foreign_storage::AbstractTextFileDataWrapper::getFileBufferParser ( ) const
protectedpure virtual

Implemented in foreign_storage::CsvDataWrapper, and foreign_storage::RegexParserDataWrapper.

Referenced by populateChunkMetadata(), populateChunks(), and restoreDataWrapperInternals().

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 51 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::string foreign_storage::AbstractTextFileDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 932 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), append_start_offset_, file_reader_, fragment_id_to_file_regions_map_, num_rows_, and foreign_storage::json_utils::write_to_string().

932  {
933  rapidjson::Document d;
934  d.SetObject();
935 
936  // Save fragment map
939  "fragment_id_to_file_regions_map",
940  d.GetAllocator());
941 
942  // Save reader metadata
943  rapidjson::Value reader_metadata(rapidjson::kObjectType);
944  file_reader_->serialize(reader_metadata, d.GetAllocator());
945  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
946 
947  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
949  d, append_start_offset_, "append_start_offset", d.GetAllocator());
950 
951  return json_utils::write_to_string(d);
952 }
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
std::string write_to_string(const rapidjson::Document &document)

+ Here is the call graph for this function:

bool foreign_storage::AbstractTextFileDataWrapper::isRestored ( ) const
overridevirtual
void foreign_storage::AbstractTextFileDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 78 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_columns(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), populateChunkMapForColumns(), populateChunks(), TableDescriptor::tableId, and updateMetadata().

80  {
81  auto timer = DEBUG_TIMER(__func__);
83  CHECK(catalog);
84  CHECK(!required_buffers.empty());
85 
86  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
87  auto required_columns =
88  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
89  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
91  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
92 
93  if (!optional_buffers.empty()) {
94  auto optional_columns =
95  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
97  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
98  }
99  populateChunks(column_id_to_chunk_map, fragment_id);
100  updateMetadata(column_id_to_chunk_map, fragment_id);
101  for (auto& entry : column_id_to_chunk_map) {
102  entry.second.setBuffer(nullptr);
103  entry.second.setIndexBuffer(nullptr);
104  }
105 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
static SysCatalog & instance()
Definition: SysCatalog.h:325
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const ChunkToBufferMap buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 63 of file AbstractTextFileDataWrapper.cpp.

References chunk_metadata_map_, db_id_, foreign_table_, foreign_storage::init_chunk_for_column(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

67  {
68  for (const auto column : columns) {
69  ChunkKey data_chunk_key = {
70  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
71  init_chunk_for_column(data_chunk_key,
73  buffers,
74  column_id_to_chunk_map[column->columnId]);
75  }
76 }
std::vector< int > ChunkKey
Definition: types.h:37
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for text file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with text content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 784 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::add_placeholder_metadata(), append_start_offset_, threading_serial::async(), foreign_storage::MetadataScanMultiThreadingParams::cached_chunks, CHECK, CHECK_EQ, foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, db_id_, DEBUG_TIMER, foreign_storage::dispatch_metadata_scan_requests(), file_reader_, foreign_storage::AbstractFileStorageDataWrapper::FILE_SORT_ORDER_BY_KEY, foreign_storage::AbstractFileStorageDataWrapper::FILE_SORT_REGEX_KEY, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::OptionsContainer::getOption(), i, Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, run_benchmark_import::parser, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex, foreign_storage::AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, TableDescriptor::tableId, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

785  {
786  auto timer = DEBUG_TIMER(__func__);
787 
788  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
789  const auto file_path = getFullFilePath(foreign_table_);
791  CHECK(catalog);
792  auto& parser = getFileBufferParser();
793  auto& server_options = foreign_table_->foreign_server->options;
794  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
795  parser.validateFiles(file_reader_.get(), foreign_table_);
796  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
797  file_reader_->checkForMoreRows(append_start_offset_);
798  } else {
799  UNREACHABLE();
800  }
801  } else {
802  // Should only be called once for non-append tables
803  CHECK(chunk_metadata_map_.empty());
805  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
806  file_reader_ = std::make_unique<LocalMultiFileReader>(
807  file_path,
808  copy_params,
814  } else {
815  UNREACHABLE();
816  }
817  parser.validateFiles(file_reader_.get(), foreign_table_);
818  num_rows_ = 0;
820  }
821 
822  auto columns =
823  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
824  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
825  for (auto column : columns) {
826  column_by_id[column->columnId] = column;
827  }
828  MetadataScanMultiThreadingParams multi_threading_params;
829 
830  // Restore previous chunk data
831  if (foreign_table_->isAppendMode()) {
832  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
833  }
834 
835  std::set<int> columns_to_scan;
836  for (auto column : columns) {
837  if (!skip_metadata_scan(column)) {
838  columns_to_scan.insert(column->columnId);
839  }
840  }
841 
842  // Track where scan started for appends
843  int start_row = num_rows_;
844  if (!file_reader_->isScanFinished()) {
845  auto buffer_size = get_buffer_size(copy_params,
846  file_reader_->isRemainingSizeKnown(),
847  file_reader_->getRemainingSize());
848  auto thread_count = get_thread_count(copy_params,
849  file_reader_->isRemainingSizeKnown(),
850  file_reader_->getRemainingSize(),
851  buffer_size);
852  multi_threading_params.continue_processing = true;
853 
854  std::vector<std::future<void>> futures{};
855  for (size_t i = 0; i < thread_count; i++) {
856  multi_threading_params.request_pool.emplace(buffer_size,
857  copy_params,
858  db_id_,
860  columns_to_scan,
862 
863  futures.emplace_back(std::async(std::launch::async,
865  std::ref(multi_threading_params),
867  std::ref(parser)));
868  }
869 
870  try {
872  file_path,
873  (*file_reader_),
874  copy_params,
875  multi_threading_params,
876  num_rows_,
879  } catch (...) {
880  {
881  std::unique_lock<std::mutex> pending_requests_lock(
882  multi_threading_params.pending_requests_mutex);
883  multi_threading_params.continue_processing = false;
884  }
885  multi_threading_params.pending_requests_condition.notify_all();
886  throw;
887  }
888 
889  for (auto& future : futures) {
890  // get() instead of wait() because we need to propagate potential exceptions.
891  future.get();
892  }
893  }
894 
895  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
896  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
897  CHECK(column_entry != column_by_id.end());
898  const auto& column_type = column_entry->second->columnType;
899  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
900  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
901  const auto& cached_chunks = multi_threading_params.cached_chunks;
902  if (!column_type.is_varlen_indeed()) {
903  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
904  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
905  chunk_entry != cached_chunks.end()) {
906  auto buffer = chunk_entry->second.getBuffer();
907  CHECK(buffer);
908  chunk_metadata->numBytes = buffer->size();
909  } else {
910  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
911  }
912  chunk_metadata_map_[chunk_key] = chunk_metadata;
913  }
914 
915  for (auto column : columns) {
916  if (skip_metadata_scan(column)) {
918  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
919  }
920  }
921 
922  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
923  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
924  }
925 
926  // Save chunk data
927  if (foreign_table_->isAppendMode()) {
928  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
929  }
930 }
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
#define UNREACHABLE()
Definition: Logger.h:253
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
virtual const TextFileBufferParser & getFileBufferParser() const =0
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:325
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::optional< std::string > getOption(const std::string_view &key) const
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
const ForeignServer * foreign_server
Definition: ForeignTable.h:54
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks

Definition at line 251 of file AbstractTextFileDataWrapper.cpp.

References threading_serial::async(), CHECK, db_id_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), i, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::parse_file_regions(), run_benchmark_import::parser, run_benchmark_import::result, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

Referenced by populateChunkBuffers().

253  {
254  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
255 
256  CHECK(!column_id_to_chunk_map.empty());
257  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
258  CHECK(!file_regions.empty());
259 
260  const auto buffer_size = get_buffer_size(file_regions);
261  const auto thread_count = get_thread_count(copy_params, file_regions);
262 
263  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
264 
265  std::vector<ParseBufferRequest> parse_file_requests{};
266  parse_file_requests.reserve(thread_count);
267  std::vector<std::future<ParseFileRegionResult>> futures{};
268  std::set<int> column_filter_set;
269  for (const auto& pair : column_id_to_chunk_map) {
270  column_filter_set.insert(pair.first);
271  }
272 
273  std::vector<std::unique_ptr<FileReader>> file_readers;
274  rapidjson::Value reader_metadata(rapidjson::kObjectType);
275  rapidjson::Document d;
276  auto& server_options = foreign_table_->foreign_server->options;
277  file_reader_->serialize(reader_metadata, d.GetAllocator());
278  const auto file_path = getFullFilePath(foreign_table_);
279  auto& parser = getFileBufferParser();
280 
281  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
282  parse_file_requests.emplace_back(
283  buffer_size, copy_params, db_id_, foreign_table_, column_filter_set, file_path);
284  auto start_index = i;
285  auto end_index =
286  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
287 
288  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
289  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
290  file_path, copy_params, reader_metadata));
291  } else {
292  UNREACHABLE();
293  }
294 
295  futures.emplace_back(std::async(std::launch::async,
297  std::ref(file_regions),
298  start_index,
299  end_index,
300  std::ref(*(file_readers.back())),
301  std::ref(parse_file_requests.back()),
302  std::ref(column_id_to_chunk_map),
303  std::ref(parser)));
304  }
305 
306  for (auto& future : futures) {
307  future.wait();
308  }
309 
310  std::vector<ParseFileRegionResult> load_file_region_results{};
311  for (auto& future : futures) {
312  load_file_region_results.emplace_back(future.get());
313  }
314 
315  for (auto result : load_file_region_results) {
316  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
317  chunk.appendData(
318  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
319  }
320  }
321 }
#define UNREACHABLE()
Definition: Logger.h:253
virtual const TextFileBufferParser & getFileBufferParser() const =0
future< Result > async(Fn &&fn, Args &&...args)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
const ForeignServer * foreign_server
Definition: ForeignTable.h:54
#define CHECK(condition)
Definition: Logger.h:209
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 954 of file AbstractTextFileDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_encoder_buffers_, chunk_metadata_map_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::json_utils::get_value_from_object(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::json_utils::read_from_file(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

956  {
957  auto d = json_utils::read_from_file(file_path);
958  CHECK(d.IsObject());
959 
960  // Restore fragment map
962  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
963 
964  // Construct reader with metadta
965  CHECK(d.HasMember("reader_metadata"));
966  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
967  const auto full_file_path = getFullFilePath(foreign_table_);
968  auto& server_options = foreign_table_->foreign_server->options;
969  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
970  file_reader_ = std::make_unique<LocalMultiFileReader>(
971  full_file_path, copy_params, d["reader_metadata"]);
972  } else {
973  UNREACHABLE();
974  }
975 
977  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
978 
979  // Now restore the internal metadata maps
980  CHECK(chunk_metadata_map_.empty());
981  CHECK(chunk_encoder_buffers_.empty());
982 
983  for (auto& pair : chunk_metadata) {
984  chunk_metadata_map_[pair.first] = pair.second;
985 
986  if (foreign_table_->isAppendMode()) {
987  // Restore encoder state for append mode
988  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
989  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
990  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
991  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
992  pair.second->numElements);
993  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
994  pair.second->chunkStats);
995  chunk_encoder_buffers_[pair.first]->setUpdated();
996  }
997  }
998  is_restored_ = true;
999 }
#define UNREACHABLE()
Definition: Logger.h:253
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
virtual const TextFileBufferParser & getFileBufferParser() const =0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
rapidjson::Document read_from_file(const std::string &file_path)
bool isAppendMode() const
Checks if the table is in append mode.
const ForeignServer * foreign_server
Definition: ForeignTable.h:54
#define CHECK(condition)
Definition: Logger.h:209
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::updateMetadata ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Definition at line 108 of file AbstractTextFileDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, TableDescriptor::fragmenter, shared::get_from_map(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

110  {
111  auto fragmenter = foreign_table_->fragmenter;
112  if (fragmenter) {
114  CHECK(catalog);
115  for (auto& entry : column_id_to_chunk_map) {
116  const auto& column =
117  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
118  if (skip_metadata_scan(column)) {
119  ChunkKey data_chunk_key = {
120  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
121  if (column->columnType.is_varlen_indeed()) {
122  data_chunk_key.emplace_back(1);
123  }
124  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
125  // Allocate new shared_ptr for metadata so we dont modify old one which may be
126  // used by executor
127  auto cached_metadata_previous =
128  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
129  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
130  std::make_shared<ChunkMetadata>();
131  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
132  *cached_metadata = *cached_metadata_previous;
133  auto chunk_metadata =
134  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
135  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
136  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
137  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
138  cached_metadata->numBytes = entry.second.getBuffer()->size();
139  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
140  }
141  }
142  }
143 }
std::vector< int > ChunkKey
Definition: types.h:37
static SysCatalog & instance()
Definition: SysCatalog.h:325
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V > &map, const K &key)
Definition: misc.h:58
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

size_t foreign_storage::AbstractTextFileDataWrapper::append_start_offset_
private
std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::AbstractTextFileDataWrapper::chunk_encoder_buffers_
private
std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::AbstractTextFileDataWrapper::chunk_metadata_map_
private
const int foreign_storage::AbstractTextFileDataWrapper::db_id_
private
std::unique_ptr<FileReader> foreign_storage::AbstractTextFileDataWrapper::file_reader_
private
const ForeignTable* foreign_storage::AbstractTextFileDataWrapper::foreign_table_
private
std::map<int, FileRegions> foreign_storage::AbstractTextFileDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::AbstractTextFileDataWrapper::is_restored_
private

Definition at line 94 of file AbstractTextFileDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

size_t foreign_storage::AbstractTextFileDataWrapper::num_rows_
private

The documentation for this class was generated from the following files: