OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::AbstractTextFileDataWrapper Class Referenceabstract

#include <AbstractTextFileDataWrapper.h>

+ Inheritance diagram for foreign_storage::AbstractTextFileDataWrapper:
+ Collaboration diagram for foreign_storage::AbstractTextFileDataWrapper:

Public Member Functions

 AbstractTextFileDataWrapper ()
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping, const bool disable_cache)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
void createRenderGroupAnalyzers () override
 Create RenderGroupAnalyzers for poly columns. More...
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual const std::set
< std::string > 
getAlterableTableOptions () const
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Protected Member Functions

virtual const
TextFileBufferParser
getFileBufferParser () const =0
 

Private Member Functions

 AbstractTextFileDataWrapper (const ForeignTable *foreign_table)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 
void updateMetadata (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< FileReaderfile_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 
const UserMappinguser_mapping_
 
const bool disable_cache_
 
RenderGroupAnalyzerMap render_group_analyzer_map_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 

Detailed Description

Definition at line 32 of file AbstractTextFileDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( )
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table,
const UserMapping user_mapping,
const bool  disable_cache 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const ForeignTable foreign_table)
private

Member Function Documentation

void foreign_storage::AbstractTextFileDataWrapper::createRenderGroupAnalyzers ( )
overridevirtual

Create RenderGroupAnalyzers for poly columns.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 1071 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHECK_GE, db_id_, foreign_table_, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), IS_GEO_POLY, render_group_analyzer_map_, and TableDescriptor::tableId.

1071  {
1072  // must have these
1073  CHECK_GE(db_id_, 0);
1075 
1076  // populate map for all poly columns in this table
1078  CHECK(catalog);
1079  auto columns =
1080  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1081  for (auto const& column : columns) {
1082  if (IS_GEO_POLY(column->columnType.get_type())) {
1084  .try_emplace(column->columnId,
1085  std::make_unique<import_export::RenderGroupAnalyzer>())
1086  .second);
1087  }
1088  }
1089 }
#define CHECK_GE(x, y)
Definition: Logger.h:236
static SysCatalog & instance()
Definition: SysCatalog.h:337
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:223
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 55 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

virtual const TextFileBufferParser& foreign_storage::AbstractTextFileDataWrapper::getFileBufferParser ( ) const
protectedpure virtual

Implemented in foreign_storage::CsvDataWrapper, and foreign_storage::RegexParserDataWrapper.

Referenced by populateChunkMetadata(), populateChunks(), and restoreDataWrapperInternals().

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 57 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::string foreign_storage::AbstractTextFileDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 995 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), append_start_offset_, file_reader_, fragment_id_to_file_regions_map_, num_rows_, and foreign_storage::json_utils::write_to_string().

995  {
996  rapidjson::Document d;
997  d.SetObject();
998 
999  // Save fragment map
1002  "fragment_id_to_file_regions_map",
1003  d.GetAllocator());
1004 
1005  // Save reader metadata
1006  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1007  file_reader_->serialize(reader_metadata, d.GetAllocator());
1008  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1009 
1010  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1012  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1013 
1014  return json_utils::write_to_string(d);
1015 }
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
std::string write_to_string(const rapidjson::Document &document)

+ Here is the call graph for this function:

bool foreign_storage::AbstractTextFileDataWrapper::isRestored ( ) const
overridevirtual
void foreign_storage::AbstractTextFileDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 98 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_columns(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), populateChunkMapForColumns(), populateChunks(), TableDescriptor::tableId, and updateMetadata().

101  {
102  auto timer = DEBUG_TIMER(__func__);
104  CHECK(catalog);
105  CHECK(!required_buffers.empty());
106 
107  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
108  auto required_columns =
109  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
110  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
112  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
113 
114  if (!optional_buffers.empty()) {
115  auto optional_columns =
116  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
118  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
119  }
120  populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
121  updateMetadata(column_id_to_chunk_map, fragment_id);
122 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
static SysCatalog & instance()
Definition: SysCatalog.h:337
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const ChunkToBufferMap buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 83 of file AbstractTextFileDataWrapper.cpp.

References chunk_metadata_map_, db_id_, foreign_table_, foreign_storage::init_chunk_for_column(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

87  {
88  for (const auto column : columns) {
89  ChunkKey data_chunk_key = {
90  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
91  init_chunk_for_column(data_chunk_key,
93  buffers,
94  column_id_to_chunk_map[column->columnId]);
95  }
96 }
std::vector< int > ChunkKey
Definition: types.h:37
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for text file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with text content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 840 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::add_placeholder_metadata(), append_start_offset_, threading_serial::async(), foreign_storage::MetadataScanMultiThreadingParams::cached_chunks, CHECK, CHECK_EQ, foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, db_id_, DEBUG_TIMER, foreign_storage::MetadataScanMultiThreadingParams::disable_cache, disable_cache_, foreign_storage::dispatch_metadata_scan_requests(), file_reader_, foreign_storage::AbstractFileStorageDataWrapper::FILE_SORT_ORDER_BY_KEY, foreign_storage::AbstractFileStorageDataWrapper::FILE_SORT_REGEX_KEY, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::OptionsContainer::getOption(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, run_benchmark_import::parser, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex, foreign_storage::AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, TableDescriptor::tableId, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

841  {
842  auto timer = DEBUG_TIMER(__func__);
843 
844  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
845  const auto file_path = getFullFilePath(foreign_table_);
847  CHECK(catalog);
848  auto& parser = getFileBufferParser();
849  auto& server_options = foreign_table_->foreign_server->options;
850  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
851  parser.validateFiles(file_reader_.get(), foreign_table_);
852  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
853  file_reader_->checkForMoreRows(append_start_offset_);
854  } else {
855  UNREACHABLE();
856  }
857  } else {
858  // Should only be called once for non-append tables
859  CHECK(chunk_metadata_map_.empty());
861  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
862  file_reader_ = std::make_unique<LocalMultiFileReader>(
863  file_path,
864  copy_params,
870  } else {
871  UNREACHABLE();
872  }
873  parser.validateFiles(file_reader_.get(), foreign_table_);
874  num_rows_ = 0;
876  }
877 
878  auto columns =
879  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
880  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
881  for (auto column : columns) {
882  column_by_id[column->columnId] = column;
883  }
884  MetadataScanMultiThreadingParams multi_threading_params;
885  multi_threading_params.disable_cache = disable_cache_;
886 
887  // Restore previous chunk data
888  if (foreign_table_->isAppendMode()) {
889  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
890  }
891 
892  std::set<int> columns_to_scan;
893  for (auto column : columns) {
894  if (!skip_metadata_scan(column)) {
895  columns_to_scan.insert(column->columnId);
896  }
897  }
898 
899  // Track where scan started for appends
900  int start_row = num_rows_;
901  if (!file_reader_->isScanFinished()) {
902  auto buffer_size = get_buffer_size(copy_params,
903  file_reader_->isRemainingSizeKnown(),
904  file_reader_->getRemainingSize());
905  auto thread_count = get_thread_count(copy_params,
906  file_reader_->isRemainingSizeKnown(),
907  file_reader_->getRemainingSize(),
908  buffer_size);
909  multi_threading_params.continue_processing = true;
910 
911  std::vector<std::future<void>> futures{};
912  for (size_t i = 0; i < thread_count; i++) {
913  multi_threading_params.request_pool.emplace(buffer_size,
914  copy_params,
915  db_id_,
917  columns_to_scan,
919  nullptr,
921  // TODO: when the cache is renabled for the import case, the above
922  // relationship between `disable_cache_` and `track_rejected_rows`
923  // will no longer hold and will need to be addressed using a different
924  // approach
925 
926  futures.emplace_back(std::async(std::launch::async,
928  std::ref(multi_threading_params),
930  std::ref(parser)));
931  }
932 
933  try {
935  file_path,
936  (*file_reader_),
937  copy_params,
938  multi_threading_params,
939  num_rows_,
942  } catch (...) {
943  {
944  std::unique_lock<std::mutex> pending_requests_lock(
945  multi_threading_params.pending_requests_mutex);
946  multi_threading_params.continue_processing = false;
947  }
948  multi_threading_params.pending_requests_condition.notify_all();
949  throw;
950  }
951 
952  for (auto& future : futures) {
953  // get() instead of wait() because we need to propagate potential exceptions.
954  future.get();
955  }
956  }
957 
958  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
959  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
960  CHECK(column_entry != column_by_id.end());
961  const auto& column_type = column_entry->second->columnType;
962  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
963  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
964  const auto& cached_chunks = multi_threading_params.cached_chunks;
965  if (!column_type.is_varlen_indeed()) {
966  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
967  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
968  chunk_entry != cached_chunks.end()) {
969  auto buffer = chunk_entry->second.getBuffer();
970  CHECK(buffer);
971  chunk_metadata->numBytes = buffer->size();
972  } else {
973  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
974  }
975  chunk_metadata_map_[chunk_key] = chunk_metadata;
976  }
977 
978  for (auto column : columns) {
979  if (skip_metadata_scan(column)) {
981  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
982  }
983  }
984 
985  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
986  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
987  }
988 
989  // Save chunk data
990  if (foreign_table_->isAppendMode()) {
991  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
992  }
993 }
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser)
#define CHECK_EQ(x, y)
Definition: Logger.h:231
#define UNREACHABLE()
Definition: Logger.h:267
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
virtual const TextFileBufferParser & getFileBufferParser() const =0
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:337
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::optional< std::string > getOption(const std::string_view &key) const
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id,
AbstractBuffer delete_buffer 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks
delete_buffer- optional buffer to store deleted row indices

Definition at line 270 of file AbstractTextFileDataWrapper.cpp.

References Data_Namespace::AbstractBuffer::append(), threading_serial::async(), CHECK, db_id_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::get_buffer_size(), foreign_storage::get_thread_count(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::parse_file_regions(), run_benchmark_import::parser, render_group_analyzer_map_, Data_Namespace::AbstractBuffer::reserve(), run_benchmark_import::result, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

Referenced by populateChunkBuffers().

273  {
274  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
275 
276  CHECK(!column_id_to_chunk_map.empty());
277  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
278  CHECK(!file_regions.empty());
279 
280  const auto buffer_size = get_buffer_size(file_regions);
281  const auto thread_count = get_thread_count(copy_params, file_regions);
282 
283  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
284 
285  std::vector<ParseBufferRequest> parse_file_requests{};
286  parse_file_requests.reserve(thread_count);
287  std::vector<std::future<ParseFileRegionResult>> futures{};
288  std::set<int> column_filter_set;
289  for (const auto& pair : column_id_to_chunk_map) {
290  column_filter_set.insert(pair.first);
291  }
292 
293  std::vector<std::unique_ptr<FileReader>> file_readers;
294  rapidjson::Value reader_metadata(rapidjson::kObjectType);
295  rapidjson::Document d;
296  auto& server_options = foreign_table_->foreign_server->options;
297  file_reader_->serialize(reader_metadata, d.GetAllocator());
298  const auto file_path = getFullFilePath(foreign_table_);
299  auto& parser = getFileBufferParser();
300 
301  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
302  parse_file_requests.emplace_back(buffer_size,
303  copy_params,
304  db_id_,
306  column_filter_set,
307  file_path,
309  delete_buffer != nullptr);
310  auto start_index = i;
311  auto end_index =
312  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
313 
314  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
315  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
316  file_path, copy_params, reader_metadata));
317  } else {
318  UNREACHABLE();
319  }
320 
321  futures.emplace_back(std::async(std::launch::async,
323  std::ref(file_regions),
324  start_index,
325  end_index,
326  std::ref(*(file_readers.back())),
327  std::ref(parse_file_requests.back()),
328  std::ref(column_id_to_chunk_map),
329  std::ref(parser)));
330  }
331 
332  for (auto& future : futures) {
333  future.wait();
334  }
335 
336  std::vector<ParseFileRegionResult> load_file_region_results{};
337  for (auto& future : futures) {
338  load_file_region_results.emplace_back(future.get());
339  }
340 
341  std::set<size_t> chunk_rejected_row_indices;
342  size_t chunk_offset = 0;
343  for (auto result : load_file_region_results) {
344  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
345  chunk.appendData(
346  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
347  }
348  for (const auto& rejected_row_index : result.rejected_row_indices) {
349  chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
350  }
351  chunk_offset += result.row_count;
352  }
353 
354  if (delete_buffer) {
355  auto chunk_element_count = chunk_offset;
356  delete_buffer->reserve(chunk_element_count);
357  for (size_t i = 0; i < chunk_element_count; ++i) {
358  if (chunk_rejected_row_indices.find(i) != chunk_rejected_row_indices.end()) {
359  int8_t true_byte = true;
360  delete_buffer->append(&true_byte, 1);
361  } else {
362  int8_t false_byte = false;
363  delete_buffer->append(&false_byte, 1);
364  }
365  }
366  }
367 }
#define UNREACHABLE()
Definition: Logger.h:267
virtual const TextFileBufferParser & getFileBufferParser() const =0
future< Result > async(Fn &&fn, Args &&...args)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:223
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void reserve(size_t num_bytes)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1017 of file AbstractTextFileDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_encoder_buffers_, chunk_metadata_map_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::json_utils::get_value_from_object(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::json_utils::read_from_file(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

1019  {
1020  auto d = json_utils::read_from_file(file_path);
1021  CHECK(d.IsObject());
1022 
1023  // Restore fragment map
1025  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1026 
1027  // Construct reader with metadta
1028  CHECK(d.HasMember("reader_metadata"));
1029  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1030  const auto full_file_path = getFullFilePath(foreign_table_);
1031  auto& server_options = foreign_table_->foreign_server->options;
1032  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1033  file_reader_ = std::make_unique<LocalMultiFileReader>(
1034  full_file_path, copy_params, d["reader_metadata"]);
1035  } else {
1036  UNREACHABLE();
1037  }
1038 
1040  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1041 
1042  // Now restore the internal metadata maps
1043  CHECK(chunk_metadata_map_.empty());
1044  CHECK(chunk_encoder_buffers_.empty());
1045 
1046  for (auto& pair : chunk_metadata) {
1047  chunk_metadata_map_[pair.first] = pair.second;
1048 
1049  if (foreign_table_->isAppendMode()) {
1050  // Restore encoder state for append mode
1051  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1052  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1053  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1054  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1055  pair.second->numElements);
1056  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1057  pair.second->chunkStats);
1058  chunk_encoder_buffers_[pair.first]->setUpdated();
1059  }
1060  }
1061  is_restored_ = true;
1062 }
#define UNREACHABLE()
Definition: Logger.h:267
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
virtual const TextFileBufferParser & getFileBufferParser() const =0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
rapidjson::Document read_from_file(const std::string &file_path)
bool isAppendMode() const
Checks if the table is in append mode.
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:223
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::updateMetadata ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Definition at line 125 of file AbstractTextFileDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, shared::get_from_map(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

127  {
129  CHECK(catalog);
130  for (auto& entry : column_id_to_chunk_map) {
131  const auto& column =
132  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
133  if (skip_metadata_scan(column)) {
134  ChunkKey data_chunk_key = {
135  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
136  if (column->columnType.is_varlen_indeed()) {
137  data_chunk_key.emplace_back(1);
138  }
139  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
140  // Allocate new shared_ptr for metadata so we dont modify old one which may be
141  // used by executor
142  auto cached_metadata_previous =
143  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
144  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
145  std::make_shared<ChunkMetadata>();
146  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
147  *cached_metadata = *cached_metadata_previous;
148  auto chunk_metadata =
149  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
150  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
151  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
152  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
153  cached_metadata->numBytes = entry.second.getBuffer()->size();
154  }
155  }
156 }
std::vector< int > ChunkKey
Definition: types.h:37
static SysCatalog & instance()
Definition: SysCatalog.h:337
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

size_t foreign_storage::AbstractTextFileDataWrapper::append_start_offset_
private
std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::AbstractTextFileDataWrapper::chunk_encoder_buffers_
private
std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::AbstractTextFileDataWrapper::chunk_metadata_map_
private
const int foreign_storage::AbstractTextFileDataWrapper::db_id_
private
const bool foreign_storage::AbstractTextFileDataWrapper::disable_cache_
private

Definition at line 109 of file AbstractTextFileDataWrapper.h.

Referenced by populateChunkMetadata().

std::unique_ptr<FileReader> foreign_storage::AbstractTextFileDataWrapper::file_reader_
private
const ForeignTable* foreign_storage::AbstractTextFileDataWrapper::foreign_table_
private
std::map<int, FileRegions> foreign_storage::AbstractTextFileDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::AbstractTextFileDataWrapper::is_restored_
private

Definition at line 104 of file AbstractTextFileDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

size_t foreign_storage::AbstractTextFileDataWrapper::num_rows_
private
RenderGroupAnalyzerMap foreign_storage::AbstractTextFileDataWrapper::render_group_analyzer_map_
private

Definition at line 114 of file AbstractTextFileDataWrapper.h.

Referenced by createRenderGroupAnalyzers(), and populateChunks().

const UserMapping* foreign_storage::AbstractTextFileDataWrapper::user_mapping_
private

Definition at line 106 of file AbstractTextFileDataWrapper.h.


The documentation for this class was generated from the following files: