OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::AbstractTextFileDataWrapper Class Referenceabstract

#include <AbstractTextFileDataWrapper.h>

+ Inheritance diagram for foreign_storage::AbstractTextFileDataWrapper:
+ Collaboration diagram for foreign_storage::AbstractTextFileDataWrapper:

Classes

struct  ResidualBuffer
 

Public Member Functions

 AbstractTextFileDataWrapper ()
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping, const bool disable_cache)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
void createRenderGroupAnalyzers () override
 Create RenderGroupAnalyzers for poly columns. More...
 
bool isLazyFragmentFetchingEnabled () const override
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
const std::set< std::string > getAlterableTableOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Protected Member Functions

virtual const
TextFileBufferParser
getFileBufferParser () const =0
 
virtual std::optional< size_t > getMaxFileCount () const
 

Private Member Functions

 AbstractTextFileDataWrapper (const ForeignTable *foreign_table)
 
void iterativeFileScan (ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 
void updateMetadata (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
void updateRolledOffChunks (const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< FileReaderfile_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 
const UserMappinguser_mapping_
 
const bool disable_cache_
 
bool is_first_file_scan_call_
 
bool is_file_scan_in_progress_
 
int iterative_scan_last_fragment_id_
 
RenderGroupAnalyzerMap render_group_analyzer_map_
 
MetadataScanMultiThreadingParams multi_threading_params_
 
size_t buffer_size_
 
size_t thread_count_
 
ResidualBuffer residual_buffer_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static shared::FilePathOptions getFilePathOptions (const ForeignTable *foreign_table)
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::string ALLOW_FILE_ROLL_OFF_KEY = "ALLOW_FILE_ROLL_OFF"
 
static const std::string THREADS_KEY = "THREADS"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 
static bool allowFileRollOff (const ForeignTable *foreign_table)
 

Detailed Description

Definition at line 92 of file AbstractTextFileDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table,
const UserMapping user_mapping,
const bool  disable_cache 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const ForeignTable foreign_table)
private

Member Function Documentation

void foreign_storage::AbstractTextFileDataWrapper::createRenderGroupAnalyzers ( )
overridevirtual

Create RenderGroupAnalyzers for poly columns.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 1756 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHECK_GE, db_id_, foreign_table_, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), IS_GEO_POLY, render_group_analyzer_map_, and TableDescriptor::tableId.

1756  {
1757  // must have these
1758  CHECK_GE(db_id_, 0);
1760 
1761  // populate map for all poly columns in this table
1763  CHECK(catalog);
1764  auto columns =
1765  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1766  for (auto const& column : columns) {
1767  if (IS_GEO_POLY(column->columnType.get_type())) {
1769  .try_emplace(column->columnId,
1770  std::make_unique<import_export::RenderGroupAnalyzer>())
1771  .second);
1772  }
1773  }
1774 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:291
#define IS_GEO_POLY(T)
Definition: sqltypes.h:305

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 115 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

virtual const TextFileBufferParser& foreign_storage::AbstractTextFileDataWrapper::getFileBufferParser ( ) const
protectedpure virtual

Implemented in foreign_storage::CsvDataWrapper, foreign_storage::RegexParserDataWrapper, and foreign_storage::InternalLogsDataWrapper.

Referenced by iterativeFileScan(), populateChunkMetadata(), populateChunks(), and restoreDataWrapperInternals().

+ Here is the caller graph for this function:

std::optional< size_t > foreign_storage::AbstractTextFileDataWrapper::getMaxFileCount ( ) const
protectedvirtual

Reimplemented in foreign_storage::InternalLogsDataWrapper.

Definition at line 1776 of file AbstractTextFileDataWrapper.cpp.

Referenced by iterativeFileScan(), and populateChunkMetadata().

1776  {
1777  return {};
1778 }

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 117 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::string foreign_storage::AbstractTextFileDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1680 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), append_start_offset_, file_reader_, fragment_id_to_file_regions_map_, num_rows_, and foreign_storage::json_utils::write_to_string().

1680  {
1681  rapidjson::Document d;
1682  d.SetObject();
1683 
1684  // Save fragment map
1687  "fragment_id_to_file_regions_map",
1688  d.GetAllocator());
1689 
1690  // Save reader metadata
1691  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1692  file_reader_->serialize(reader_metadata, d.GetAllocator());
1693  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1694 
1695  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1697  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1698 
1699  return json_utils::write_to_string(d);
1700 }
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:157
std::string write_to_string(const rapidjson::Document &document)

+ Here is the call graph for this function:

bool foreign_storage::AbstractTextFileDataWrapper::isLazyFragmentFetchingEnabled ( ) const
inlineoverridevirtual

If true data wrapper implements a lazy fragment fetching mode. This mode allows requests for fragments to be issued to populateChunks without the prerequisite that populateChunkMetadata has successfully finished execution. This is an optimization that has some specific use-cases and is not required.

NOTE: this mode is not guaranteed to work as expected when combined with certain types of refresh modes such as append. This is subject to change in the future, but has no impact on the intended use-cases of this mode.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 123 of file AbstractTextFileDataWrapper.h.

123 { return true; }
bool foreign_storage::AbstractTextFileDataWrapper::isRestored ( ) const
overridevirtual
void foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan ( ChunkMetadataVector chunk_metadata_vector,
IterativeFileScanParameters file_scan_param 
)
private

Implements an iterative file scan that enables populating chunks fragment by fragment.

Definition at line 1516 of file AbstractTextFileDataWrapper.cpp.

References append_start_offset_, threading_serial::async(), buffer_size_, CHECK, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, db_id_, DEBUG_TIMER, foreign_storage::MetadataScanMultiThreadingParams::disable_cache, disable_cache_, foreign_storage::dispatch_scan_requests_with_exception_handling(), file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFilePathOptions(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), getMaxFileCount(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished(), is_file_scan_in_progress_, is_first_file_scan_call_, foreign_storage::ForeignTable::isAppendMode(), iterative_scan_last_fragment_id_, multi_threading_params_, num_rows_, foreign_storage::OptionsContainer::options, run_benchmark_import::parser, foreign_storage::populate_chunks(), render_group_analyzer_map_, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::reset_multithreading_params(), residual_buffer_, TableDescriptor::tableId, thread_count_, user_mapping_, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

Referenced by populateChunks().

1518  {
1519  auto timer = DEBUG_TIMER(__func__);
1520 
1522 
1524  << " iterative file scan can not be used with APPEND mode.";
1525 
1526  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1527  const auto file_path = getFullFilePath(foreign_table_);
1529  CHECK(catalog);
1530  auto& parser = getFileBufferParser();
1531  const auto file_path_options = getFilePathOptions(foreign_table_);
1532  auto& server_options = foreign_table_->foreign_server->options;
1533 
1539  server_options,
1540  file_reader_,
1541  file_path,
1542  copy_params,
1543  file_path_options,
1544  getMaxFileCount(),
1546  user_mapping_,
1547  parser,
1548  [] { return ""; },
1549  num_rows_,
1551  }
1552 
1553  auto columns =
1554  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1555  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1556  for (auto column : columns) {
1557  column_by_id[column->columnId] = column;
1558  }
1559 
1560  if (is_first_file_scan_call_) { // reiniitialize all members that may have state in
1561  // `multi_threading_params_`
1563  }
1564 
1566 
1567  std::set<int> columns_to_scan;
1568  for (auto column : columns) {
1569  columns_to_scan.insert(column->columnId);
1570  }
1571 
1574  // NOTE: `buffer_size_` and `thread_count_` must not change across an iterative
1575  // scan
1576  buffer_size_ = get_buffer_size(copy_params,
1577  file_reader_->isRemainingSizeKnown(),
1578  file_reader_->getRemainingSize());
1579  thread_count_ = get_thread_count(copy_params,
1580  file_reader_->isRemainingSizeKnown(),
1581  file_reader_->getRemainingSize(),
1582  buffer_size_);
1583  }
1585 
1586  std::vector<std::future<void>> futures{};
1587  for (size_t i = 0; i < thread_count_; i++) {
1590  copy_params,
1591  db_id_,
1593  columns_to_scan,
1596  true);
1597  }
1598  futures.emplace_back(std::async(std::launch::async,
1600  std::ref(multi_threading_params_),
1602  std::ref(parser),
1603  std::ref(file_scan_param)));
1604  }
1605 
1607  buffer_size_,
1608  file_path,
1609  (*file_reader_),
1610  copy_params,
1612  num_rows_,
1615  &file_scan_param,
1619 
1620  for (auto& future : futures) {
1621  // get() instead of wait() because we need to propagate potential exceptions.
1622  future.get();
1623  }
1624  }
1625 
1627  is_first_file_scan_call_ = false;
1628  }
1629 
1632  }
1633 }
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void reset_multithreading_params(foreign_storage::MetadataScanMultiThreadingParams &multi_threading_params)
virtual const TextFileBufferParser & getFileBufferParser() const =0
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
MetadataScanMultiThreadingParams multi_threading_params_
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:343
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void populate_chunks(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser, foreign_storage::IterativeFileScanParameters &file_scan_param)
virtual std::optional< size_t > getMaxFileCount() const
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 119 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_columns(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), is_file_scan_in_progress_, populateChunkMapForColumns(), populateChunks(), TableDescriptor::tableId, and updateMetadata().

122  {
123  auto timer = DEBUG_TIMER(__func__);
125  CHECK(catalog);
126  CHECK(!required_buffers.empty());
127 
128  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
129  auto required_columns =
130  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
131  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
133  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
134 
135  if (!optional_buffers.empty()) {
136  auto optional_columns =
137  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
139  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
140  }
141  populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
143  updateMetadata(column_id_to_chunk_map, fragment_id);
144  }
145 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
static SysCatalog & instance()
Definition: SysCatalog.h:343
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const ChunkToBufferMap buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 104 of file AbstractTextFileDataWrapper.cpp.

References chunk_metadata_map_, db_id_, foreign_table_, foreign_storage::init_chunk_for_column(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

108  {
109  for (const auto column : columns) {
110  ChunkKey data_chunk_key = {
111  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
112  init_chunk_for_column(data_chunk_key,
114  buffers,
115  column_id_to_chunk_map[column->columnId]);
116  }
117 }
std::vector< int > ChunkKey
Definition: types.h:36
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for text file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with text content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1363 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::add_placeholder_metadata(), foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), append_start_offset_, threading_serial::async(), foreign_storage::MetadataScanMultiThreadingParams::cached_chunks, CHECK, CHECK_EQ, foreign_storage::MultiFileReader::checkForRolledOffFiles(), foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, db_id_, DEBUG_TIMER, foreign_storage::MetadataScanMultiThreadingParams::disable_cache, disable_cache_, foreign_storage::dispatch_scan_requests_with_exception_handling(), file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFilePathOptions(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), getMaxFileCount(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, run_benchmark_import::parser, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, TableDescriptor::tableId, UNREACHABLE, updateRolledOffChunks(), user_mapping_, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

1364  {
1365  auto timer = DEBUG_TIMER(__func__);
1366 
1367  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1368  const auto file_path = getFullFilePath(foreign_table_);
1370  CHECK(catalog);
1371  auto& parser = getFileBufferParser();
1372  const auto file_path_options = getFilePathOptions(foreign_table_);
1373  auto& server_options = foreign_table_->foreign_server->options;
1374  std::set<std::string> rolled_off_files;
1375  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
1376  auto multi_file_reader = dynamic_cast<MultiFileReader*>(file_reader_.get());
1377  if (allowFileRollOff(foreign_table_) && multi_file_reader) {
1378  rolled_off_files = multi_file_reader->checkForRolledOffFiles(file_path_options);
1379  }
1380  parser.validateFiles(file_reader_.get(), foreign_table_);
1381  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1382  file_reader_->checkForMoreRows(append_start_offset_, file_path_options);
1383  } else {
1384  UNREACHABLE();
1385  }
1386  } else {
1390  server_options,
1391  file_reader_,
1392  file_path,
1393  copy_params,
1394  file_path_options,
1395  getMaxFileCount(),
1397  user_mapping_,
1398  parser,
1399  [] { return ""; },
1400  num_rows_,
1402  }
1403 
1404  auto columns =
1405  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1406  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1407  for (auto column : columns) {
1408  column_by_id[column->columnId] = column;
1409  }
1410  MetadataScanMultiThreadingParams multi_threading_params;
1411  multi_threading_params.disable_cache = disable_cache_;
1412 
1413  // Restore previous chunk data
1414  if (foreign_table_->isAppendMode()) {
1415  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
1416  }
1417 
1418  std::set<int> columns_to_scan;
1419  for (auto column : columns) {
1420  if (!skip_metadata_scan(column)) {
1421  columns_to_scan.insert(column->columnId);
1422  }
1423  }
1424 
1425  // Track where scan started for appends
1426  int start_row = num_rows_;
1427  if (!file_reader_->isScanFinished()) {
1428  auto buffer_size = get_buffer_size(copy_params,
1429  file_reader_->isRemainingSizeKnown(),
1430  file_reader_->getRemainingSize());
1431  auto thread_count = get_thread_count(copy_params,
1432  file_reader_->isRemainingSizeKnown(),
1433  file_reader_->getRemainingSize(),
1434  buffer_size);
1435  multi_threading_params.continue_processing = true;
1436 
1437  std::vector<std::future<void>> futures{};
1438  for (size_t i = 0; i < thread_count; i++) {
1439  multi_threading_params.request_pool.emplace(buffer_size,
1440  copy_params,
1441  db_id_,
1443  columns_to_scan,
1445  nullptr,
1446  disable_cache_);
1447 
1448  futures.emplace_back(std::async(std::launch::async,
1449  scan_metadata,
1450  std::ref(multi_threading_params),
1452  std::ref(parser)));
1453  }
1454 
1455  ResidualBuffer residual_buffer;
1457  buffer_size,
1458  file_path,
1459  (*file_reader_),
1460  copy_params,
1461  multi_threading_params,
1462  num_rows_,
1465  nullptr,
1466  residual_buffer,
1467  true);
1468 
1469  for (auto& future : futures) {
1470  // get() instead of wait() because we need to propagate potential exceptions.
1471  future.get();
1472  }
1473  }
1474 
1475  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
1476  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
1477  CHECK(column_entry != column_by_id.end());
1478  const auto& column_type = column_entry->second->columnType;
1479  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
1480  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
1481  const auto& cached_chunks = multi_threading_params.cached_chunks;
1482  if (!column_type.is_varlen_indeed()) {
1483  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
1484  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
1485  chunk_entry != cached_chunks.end()) {
1486  auto buffer = chunk_entry->second.getBuffer();
1487  CHECK(buffer);
1488  chunk_metadata->numBytes = buffer->size();
1489  } else {
1490  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
1491  }
1492  chunk_metadata_map_[chunk_key] = chunk_metadata;
1493  }
1494 
1495  for (auto column : columns) {
1496  if (skip_metadata_scan(column)) {
1498  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
1499  }
1500  }
1501 
1502  if (!rolled_off_files.empty()) {
1503  updateRolledOffChunks(rolled_off_files, column_by_id);
1504  }
1505 
1506  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1507  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1508  }
1509 
1510  // Save chunk data
1511  if (foreign_table_->isAppendMode()) {
1512  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
1513  }
1514 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:301
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void updateRolledOffChunks(const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
#define UNREACHABLE()
Definition: Logger.h:337
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
virtual const TextFileBufferParser & getFileBufferParser() const =0
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:343
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
virtual std::optional< size_t > getMaxFileCount() const
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id,
AbstractBuffer delete_buffer 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks
delete_buffer- optional buffer to store deleted row indices

Definition at line 339 of file AbstractTextFileDataWrapper.cpp.

References threading_serial::async(), CHECK, db_id_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), Data_Namespace::AbstractBuffer::getMemoryPtr(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished(), is_file_scan_in_progress_, is_first_file_scan_call_, iterativeFileScan(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, multi_threading_params_, foreign_storage::OptionsContainer::options, foreign_storage::parse_file_regions(), run_benchmark_import::parser, render_group_analyzer_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::resize_delete_buffer(), run_benchmark_import::result, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_fragment_id_out_of_bounds_error(), UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

Referenced by populateChunkBuffers().

342  {
343  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
344 
345  CHECK(!column_id_to_chunk_map.empty());
346 
347  // check to see if a iterative scan step is required
348  auto file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
349  if (file_regions_it == fragment_id_to_file_regions_map_.end() ||
351  // check to see if there is more foreign data to scan
354  // NOTE: we can only guarantee the current `fragment_id` is fully done
355  // iterative scan if either
356  // 1) the scan is finished OR
357  // 2) `fragment_id+1` exists in the internal map
358  // this is why `fragment_id+1` is checked for below
359  auto file_regions_it_one_ahead =
360  fragment_id_to_file_regions_map_.find(fragment_id + 1);
362  (file_regions_it_one_ahead == fragment_id_to_file_regions_map_.end())) {
363  ChunkMetadataVector chunk_metadata_vector;
364  IterativeFileScanParameters iterative_params{
365  column_id_to_chunk_map, fragment_id, delete_buffer};
366  iterativeFileScan(chunk_metadata_vector, iterative_params);
367  }
368  }
369 
370  file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
371  if (file_regions_it == fragment_id_to_file_regions_map_.end()) {
373  is_file_scan_in_progress_ = false; // conclude the iterative scan is finished
375  true; // any subsequent iterative request can assume they will be the first
377  foreign_table_, fragment_id, fragment_id_to_file_regions_map_.rbegin()->first);
378  } else {
379  // iterative scan is required to have loaded all required chunks thus we
380  // can exit early
381  return;
382  }
383  }
384  CHECK(file_regions_it != fragment_id_to_file_regions_map_.end());
385 
386  const auto& file_regions = file_regions_it->second;
387 
388  // File roll off can lead to empty file regions.
389  if (file_regions.empty()) {
390  return;
391  }
392 
393  const auto buffer_size = get_buffer_size(file_regions);
394  const auto thread_count = get_thread_count(copy_params, file_regions);
395 
396  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
397 
398  std::vector<ParseBufferRequest> parse_file_requests{};
399  parse_file_requests.reserve(thread_count);
400  std::vector<std::future<ParseFileRegionResult>> futures{};
401  std::set<int> column_filter_set;
402  for (const auto& pair : column_id_to_chunk_map) {
403  column_filter_set.insert(pair.first);
404  }
405 
406  std::vector<std::unique_ptr<FileReader>> file_readers;
407  rapidjson::Value reader_metadata(rapidjson::kObjectType);
408  rapidjson::Document d;
409  auto& server_options = foreign_table_->foreign_server->options;
410  file_reader_->serialize(reader_metadata, d.GetAllocator());
411  const auto file_path = getFullFilePath(foreign_table_);
412  auto& parser = getFileBufferParser();
413 
414  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
415  parse_file_requests.emplace_back(buffer_size,
416  copy_params,
417  db_id_,
419  column_filter_set,
420  file_path,
422  delete_buffer != nullptr);
423  auto start_index = i;
424  auto end_index =
425  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
426 
427  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
428  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
429  file_path, copy_params, reader_metadata));
430  } else {
431  UNREACHABLE();
432  }
433 
434  futures.emplace_back(std::async(std::launch::async,
436  std::ref(file_regions),
437  start_index,
438  end_index,
439  std::ref(*(file_readers.back())),
440  std::ref(parse_file_requests.back()),
441  std::ref(column_id_to_chunk_map),
442  std::ref(parser)));
443  }
444 
445  for (auto& future : futures) {
446  future.wait();
447  }
448 
449  std::vector<ParseFileRegionResult> load_file_region_results{};
450  for (auto& future : futures) {
451  load_file_region_results.emplace_back(future.get());
452  }
453 
454  std::set<size_t> chunk_rejected_row_indices;
455  size_t chunk_offset = 0;
456  for (auto result : load_file_region_results) {
457  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
458  chunk.appendData(
459  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
460  }
461  for (const auto& rejected_row_index : result.rejected_row_indices) {
462  chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
463  }
464  chunk_offset += result.row_count;
465  }
466 
467  if (delete_buffer) {
468  // ensure delete buffer is sized appropriately
469  resize_delete_buffer(delete_buffer, chunk_offset);
470 
471  auto delete_buffer_data = delete_buffer->getMemoryPtr();
472  for (const auto rejected_row_index : chunk_rejected_row_indices) {
473  delete_buffer_data[rejected_row_index] = true;
474  }
475  }
476 }
void resize_delete_buffer(AbstractBuffer *delete_buffer, const size_t chunk_element_count)
virtual int8_t * getMemoryPtr()=0
#define UNREACHABLE()
Definition: Logger.h:337
virtual const TextFileBufferParser & getFileBufferParser() const =0
void iterativeFileScan(ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
MetadataScanMultiThreadingParams multi_threading_params_
future< Result > async(Fn &&fn, Args &&...args)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void throw_fragment_id_out_of_bounds_error(const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:291
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1702 of file AbstractTextFileDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_encoder_buffers_, chunk_metadata_map_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::json_utils::get_value_from_object(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::json_utils::read_from_file(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

1704  {
1705  auto d = json_utils::read_from_file(file_path);
1706  CHECK(d.IsObject());
1707 
1708  // Restore fragment map
1710  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1711 
1712  // Construct reader with metadta
1713  CHECK(d.HasMember("reader_metadata"));
1714  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1715  const auto full_file_path = getFullFilePath(foreign_table_);
1716  auto& server_options = foreign_table_->foreign_server->options;
1717  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1718  file_reader_ = std::make_unique<LocalMultiFileReader>(
1719  full_file_path, copy_params, d["reader_metadata"]);
1720  } else {
1721  UNREACHABLE();
1722  }
1723 
1725  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1726 
1727  // Now restore the internal metadata maps
1728  CHECK(chunk_metadata_map_.empty());
1729  CHECK(chunk_encoder_buffers_.empty());
1730 
1731  for (auto& pair : chunk_metadata) {
1732  chunk_metadata_map_[pair.first] = pair.second;
1733 
1734  if (foreign_table_->isAppendMode()) {
1735  // Restore encoder state for append mode
1736  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1737  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1738  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1739  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1740  pair.second->numElements);
1741  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1742  pair.second->chunkStats);
1743  chunk_encoder_buffers_[pair.first]->setUpdated();
1744  }
1745  }
1746  is_restored_ = true;
1747 }
#define UNREACHABLE()
Definition: Logger.h:337
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
virtual const TextFileBufferParser & getFileBufferParser() const =0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:172
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
rapidjson::Document read_from_file(const std::string &file_path)
bool isAppendMode() const
Checks if the table is in append mode.
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:291
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::updateMetadata ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Definition at line 148 of file AbstractTextFileDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, shared::get_from_map(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

150  {
152  CHECK(catalog);
153  for (auto& entry : column_id_to_chunk_map) {
154  const auto& column =
155  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
156  if (skip_metadata_scan(column)) {
157  ChunkKey data_chunk_key = {
158  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
159  if (column->columnType.is_varlen_indeed()) {
160  data_chunk_key.emplace_back(1);
161  }
162  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
163  // Allocate new shared_ptr for metadata so we dont modify old one which may be
164  // used by executor
165  auto cached_metadata_previous =
166  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
167  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
168  std::make_shared<ChunkMetadata>();
169  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
170  *cached_metadata = *cached_metadata_previous;
171  auto chunk_metadata =
172  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
173  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
174  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
175  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
176  cached_metadata->numBytes = entry.second.getBuffer()->size();
177  }
178  }
179 }
std::vector< int > ChunkKey
Definition: types.h:36
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::updateRolledOffChunks ( const std::set< std::string > &  rolled_off_files,
const std::map< int32_t, const ColumnDescriptor * > &  column_by_id 
)
private

Definition at line 1635 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, chunk_metadata_map_, shared::contains(), fragment_id_to_file_regions_map_, shared::get_from_map(), and foreign_storage::get_placeholder_metadata().

Referenced by populateChunkMetadata().

1637  {
1638  std::set<int32_t> deleted_fragment_ids;
1639  std::optional<int32_t> partially_deleted_fragment_id;
1640  std::optional<size_t> partially_deleted_fragment_row_count;
1641  for (auto& [fragment_id, file_regions] : fragment_id_to_file_regions_map_) {
1642  bool file_region_deleted{false};
1643  for (auto it = file_regions.begin(); it != file_regions.end();) {
1644  if (shared::contains(rolled_off_files, it->file_path)) {
1645  it = file_regions.erase(it);
1646  file_region_deleted = true;
1647  } else {
1648  it++;
1649  }
1650  }
1651  if (file_regions.empty()) {
1652  deleted_fragment_ids.emplace(fragment_id);
1653  } else if (file_region_deleted) {
1654  partially_deleted_fragment_id = fragment_id;
1655  partially_deleted_fragment_row_count = 0;
1656  for (const auto& file_region : file_regions) {
1657  partially_deleted_fragment_row_count.value() += file_region.row_count;
1658  }
1659  break;
1660  }
1661  }
1662 
1663  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1664  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
1665  chunk_metadata->numElements = 0;
1666  chunk_metadata->numBytes = 0;
1667  } else if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
1668  CHECK(partially_deleted_fragment_row_count.has_value());
1669  auto old_chunk_stats = chunk_metadata->chunkStats;
1670  auto cd = shared::get_from_map(column_by_id, chunk_key[CHUNK_KEY_COLUMN_IDX]);
1671  chunk_metadata = get_placeholder_metadata(
1672  cd->columnType, partially_deleted_fragment_row_count.value());
1673  // Old chunk stats will still be correct (since only row deletion is occurring)
1674  // and more accurate than that of the placeholder metadata.
1675  chunk_metadata->chunkStats = old_chunk_stats;
1676  }
1677  }
1678 }
bool contains(const T &container, const U &element)
Definition: misc.h:195
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK(condition)
Definition: Logger.h:291
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

size_t foreign_storage::AbstractTextFileDataWrapper::append_start_offset_
private
size_t foreign_storage::AbstractTextFileDataWrapper::buffer_size_
private

Definition at line 205 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::AbstractTextFileDataWrapper::chunk_encoder_buffers_
private
std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::AbstractTextFileDataWrapper::chunk_metadata_map_
private
const int foreign_storage::AbstractTextFileDataWrapper::db_id_
private
const bool foreign_storage::AbstractTextFileDataWrapper::disable_cache_
private

Definition at line 190 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunkMetadata().

std::unique_ptr<FileReader> foreign_storage::AbstractTextFileDataWrapper::file_reader_
private
std::map<int, FileRegions> foreign_storage::AbstractTextFileDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::AbstractTextFileDataWrapper::is_file_scan_in_progress_
private
bool foreign_storage::AbstractTextFileDataWrapper::is_first_file_scan_call_
private

Definition at line 192 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunks().

bool foreign_storage::AbstractTextFileDataWrapper::is_restored_
private

Definition at line 185 of file AbstractTextFileDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

int foreign_storage::AbstractTextFileDataWrapper::iterative_scan_last_fragment_id_
private

Definition at line 196 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

MetadataScanMultiThreadingParams foreign_storage::AbstractTextFileDataWrapper::multi_threading_params_
private

Definition at line 204 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunks().

size_t foreign_storage::AbstractTextFileDataWrapper::num_rows_
private
RenderGroupAnalyzerMap foreign_storage::AbstractTextFileDataWrapper::render_group_analyzer_map_
private
ResidualBuffer foreign_storage::AbstractTextFileDataWrapper::residual_buffer_
private

Definition at line 208 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

size_t foreign_storage::AbstractTextFileDataWrapper::thread_count_
private

Definition at line 206 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

const UserMapping* foreign_storage::AbstractTextFileDataWrapper::user_mapping_
private

Definition at line 187 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunkMetadata().


The documentation for this class was generated from the following files: