OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::AbstractTextFileDataWrapper Class Referenceabstract

#include <AbstractTextFileDataWrapper.h>

+ Inheritance diagram for foreign_storage::AbstractTextFileDataWrapper:
+ Collaboration diagram for foreign_storage::AbstractTextFileDataWrapper:

Classes

struct  ResidualBuffer
 

Public Member Functions

 AbstractTextFileDataWrapper ()
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping, const bool disable_cache)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
bool isLazyFragmentFetchingEnabled () const override
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
const std::set< std::string > getAlterableTableOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Protected Member Functions

virtual const
TextFileBufferParser
getFileBufferParser () const =0
 
virtual std::optional< size_t > getMaxFileCount () const
 

Private Member Functions

 AbstractTextFileDataWrapper (const ForeignTable *foreign_table)
 
void iterativeFileScan (ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 
void updateMetadata (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
void updateRolledOffChunks (const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< FileReaderfile_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 
const UserMappinguser_mapping_
 
const bool disable_cache_
 
bool is_first_file_scan_call_
 
bool is_file_scan_in_progress_
 
int iterative_scan_last_fragment_id_
 
MetadataScanMultiThreadingParams multi_threading_params_
 
size_t buffer_size_
 
size_t thread_count_
 
ResidualBuffer residual_buffer_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static shared::FilePathOptions getFilePathOptions (const ForeignTable *foreign_table)
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::string ALLOW_FILE_ROLL_OFF_KEY = "ALLOW_FILE_ROLL_OFF"
 
static const std::string THREADS_KEY = "THREADS"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 
static bool allowFileRollOff (const ForeignTable *foreign_table)
 

Detailed Description

Definition at line 92 of file AbstractTextFileDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table,
const UserMapping user_mapping,
const bool  disable_cache 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const ForeignTable foreign_table)
private

Member Function Documentation

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 115 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

virtual const TextFileBufferParser& foreign_storage::AbstractTextFileDataWrapper::getFileBufferParser ( ) const
protectedpure virtual

Implemented in foreign_storage::CsvDataWrapper, foreign_storage::RegexParserDataWrapper, and foreign_storage::InternalLogsDataWrapper.

Referenced by iterativeFileScan(), populateChunkMetadata(), populateChunks(), and restoreDataWrapperInternals().

+ Here is the caller graph for this function:

std::optional< size_t > foreign_storage::AbstractTextFileDataWrapper::getMaxFileCount ( ) const
protectedvirtual

Reimplemented in foreign_storage::InternalLogsDataWrapper.

Definition at line 1763 of file AbstractTextFileDataWrapper.cpp.

Referenced by iterativeFileScan(), and populateChunkMetadata().

1763  {
1764  return {};
1765 }

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 117 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::string foreign_storage::AbstractTextFileDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1690 of file AbstractTextFileDataWrapper.cpp.

References json_utils::add_value_to_object(), append_start_offset_, file_reader_, fragment_id_to_file_regions_map_, num_rows_, and json_utils::write_to_string().

1690  {
1691  rapidjson::Document d;
1692  d.SetObject();
1693 
1694  // Save fragment map
1697  "fragment_id_to_file_regions_map",
1698  d.GetAllocator());
1699 
1700  // Save reader metadata
1701  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1702  file_reader_->serialize(reader_metadata, d.GetAllocator());
1703  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1704 
1705  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1707  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1708 
1709  return json_utils::write_to_string(d);
1710 }
std::string write_to_string(const rapidjson::Document &document)
Definition: JsonUtils.cpp:225
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: JsonUtils.h:255

+ Here is the call graph for this function:

bool foreign_storage::AbstractTextFileDataWrapper::isLazyFragmentFetchingEnabled ( ) const
inlineoverridevirtual

If true data wrapper implements a lazy fragment fetching mode. This mode allows requests for fragments to be issued to populateChunks without the prerequisite that populateChunkMetadata has successfully finished execution. This is an optimization that has some specific use-cases and is not required.

NOTE: this mode is not guaranteed to work as expected when combined with certain types of refresh modes such as append. This is subject to change in the future, but has no impact on the intended use-cases of this mode.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 121 of file AbstractTextFileDataWrapper.h.

121 { return true; }
bool foreign_storage::AbstractTextFileDataWrapper::isRestored ( ) const
overridevirtual
void foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan ( ChunkMetadataVector chunk_metadata_vector,
IterativeFileScanParameters file_scan_param 
)
private

Implements an iterative file scan that enables populating chunks fragment by fragment.

Definition at line 1527 of file AbstractTextFileDataWrapper.cpp.

References append_start_offset_, threading_serial::async(), buffer_size_, CHECK, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, db_id_, DEBUG_TIMER, foreign_storage::MetadataScanMultiThreadingParams::disable_cache, disable_cache_, foreign_storage::dispatch_scan_requests_with_exception_handling(), file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFilePathOptions(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), getMaxFileCount(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished(), is_file_scan_in_progress_, is_first_file_scan_call_, foreign_storage::ForeignTable::isAppendMode(), iterative_scan_last_fragment_id_, multi_threading_params_, num_rows_, foreign_storage::OptionsContainer::options, run_benchmark_import::parser, foreign_storage::populate_chunks(), foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::reset_multithreading_params(), residual_buffer_, TableDescriptor::tableId, thread_count_, user_mapping_, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

Referenced by populateChunks().

1529  {
1530  auto timer = DEBUG_TIMER(__func__);
1531 
1533 
1535  << " iterative file scan can not be used with APPEND mode.";
1536 
1537  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1538  const auto file_path = getFullFilePath(foreign_table_);
1540  CHECK(catalog);
1541  auto& parser = getFileBufferParser();
1542  const auto file_path_options = getFilePathOptions(foreign_table_);
1543  auto& server_options = foreign_table_->foreign_server->options;
1544 
1550  server_options,
1551  file_reader_,
1552  file_path,
1553  copy_params,
1554  file_path_options,
1555  getMaxFileCount(),
1557  user_mapping_,
1558  parser,
1559  [] { return ""; },
1560  num_rows_,
1562  }
1563 
1564  auto columns =
1565  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1566  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1567  for (auto column : columns) {
1568  column_by_id[column->columnId] = column;
1569  }
1570 
1571  if (is_first_file_scan_call_) { // reiniitialize all members that may have state in
1572  // `multi_threading_params_`
1574  }
1575 
1577 
1578  std::set<int> columns_to_scan;
1579  for (auto column : columns) {
1580  columns_to_scan.insert(column->columnId);
1581  }
1582 
1585  // NOTE: `buffer_size_` and `thread_count_` must not change across an iterative
1586  // scan
1587  buffer_size_ = get_buffer_size(copy_params,
1588  file_reader_->isRemainingSizeKnown(),
1589  file_reader_->getRemainingSize());
1590  thread_count_ = get_thread_count(copy_params,
1591  file_reader_->isRemainingSizeKnown(),
1592  file_reader_->getRemainingSize(),
1593  buffer_size_);
1594  }
1596 
1597  std::vector<std::future<void>> futures{};
1598  for (size_t i = 0; i < thread_count_; i++) {
1601  copy_params,
1602  db_id_,
1604  columns_to_scan,
1606  true);
1607  }
1608  futures.emplace_back(std::async(std::launch::async,
1610  std::ref(multi_threading_params_),
1612  std::ref(parser),
1613  std::ref(file_scan_param)));
1614  }
1615 
1617  buffer_size_,
1618  file_path,
1619  (*file_reader_),
1620  copy_params,
1622  num_rows_,
1625  &file_scan_param,
1629 
1630  for (auto& future : futures) {
1631  // get() instead of wait() because we need to propagate potential exceptions.
1632  future.get();
1633  }
1634  }
1635 
1637  is_first_file_scan_call_ = false;
1638  }
1639 
1642  }
1643 }
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void reset_multithreading_params(foreign_storage::MetadataScanMultiThreadingParams &multi_threading_params)
virtual const TextFileBufferParser & getFileBufferParser() const =0
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
MetadataScanMultiThreadingParams multi_threading_params_
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:343
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void populate_chunks(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser, foreign_storage::IterativeFileScanParameters &file_scan_param)
virtual std::optional< size_t > getMaxFileCount() const
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
const ForeignServer * foreign_server
Definition: ForeignTable.h:57
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 118 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_columns(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), is_file_scan_in_progress_, populateChunkMapForColumns(), populateChunks(), TableDescriptor::tableId, and updateMetadata().

121  {
122  auto timer = DEBUG_TIMER(__func__);
124  CHECK(catalog);
125  CHECK(!required_buffers.empty());
126 
127  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
128  auto required_columns =
129  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
130  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
132  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
133 
134  if (!optional_buffers.empty()) {
135  auto optional_columns =
136  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
138  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
139  }
140  populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
142  updateMetadata(column_id_to_chunk_map, fragment_id);
143  }
144 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
static SysCatalog & instance()
Definition: SysCatalog.h:343
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const ChunkToBufferMap buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 103 of file AbstractTextFileDataWrapper.cpp.

References chunk_metadata_map_, db_id_, foreign_table_, foreign_storage::init_chunk_for_column(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

107  {
108  for (const auto column : columns) {
109  ChunkKey data_chunk_key = {
110  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
111  init_chunk_for_column(data_chunk_key,
113  buffers,
114  column_id_to_chunk_map[column->columnId]);
115  }
116 }
std::vector< int > ChunkKey
Definition: types.h:36
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for text file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with text content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1373 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::add_placeholder_metadata(), foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), append_start_offset_, threading_serial::async(), foreign_storage::MetadataScanMultiThreadingParams::cached_chunks, CHECK, foreign_storage::MultiFileReader::checkForRolledOffFiles(), foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, db_id_, DEBUG_TIMER, foreign_storage::MetadataScanMultiThreadingParams::disable_cache, disable_cache_, foreign_storage::dispatch_scan_requests_with_exception_handling(), file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFilePathOptions(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), getMaxFileCount(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, run_benchmark_import::parser, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, TableDescriptor::tableId, UNREACHABLE, updateRolledOffChunks(), user_mapping_, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

1374  {
1375  auto timer = DEBUG_TIMER(__func__);
1376 
1377  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1378  const auto file_path = getFullFilePath(foreign_table_);
1380  CHECK(catalog);
1381  auto& parser = getFileBufferParser();
1382  const auto file_path_options = getFilePathOptions(foreign_table_);
1383  auto& server_options = foreign_table_->foreign_server->options;
1384  std::set<std::string> rolled_off_files;
1385  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
1386  auto multi_file_reader = dynamic_cast<MultiFileReader*>(file_reader_.get());
1387  if (allowFileRollOff(foreign_table_) && multi_file_reader) {
1388  rolled_off_files = multi_file_reader->checkForRolledOffFiles(file_path_options);
1389  }
1390  parser.validateFiles(file_reader_.get(), foreign_table_);
1391  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1392  file_reader_->checkForMoreRows(append_start_offset_, file_path_options);
1393  } else {
1394  UNREACHABLE();
1395  }
1396  } else {
1400  server_options,
1401  file_reader_,
1402  file_path,
1403  copy_params,
1404  file_path_options,
1405  getMaxFileCount(),
1407  user_mapping_,
1408  parser,
1409  [] { return ""; },
1410  num_rows_,
1412  }
1413 
1414  auto columns =
1415  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1416  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1417  for (auto column : columns) {
1418  column_by_id[column->columnId] = column;
1419  }
1420  MetadataScanMultiThreadingParams multi_threading_params;
1421  multi_threading_params.disable_cache = disable_cache_;
1422 
1423  // Restore previous chunk data
1424  if (foreign_table_->isAppendMode()) {
1425  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
1426  }
1427 
1428  std::set<int> columns_to_scan;
1429  for (auto column : columns) {
1430  if (!skip_metadata_scan(column)) {
1431  columns_to_scan.insert(column->columnId);
1432  }
1433  }
1434 
1435  // Track where scan started for appends
1436  int start_row = num_rows_;
1438  if (!file_reader_->isScanFinished()) {
1439  auto buffer_size = get_buffer_size(copy_params,
1440  file_reader_->isRemainingSizeKnown(),
1441  file_reader_->getRemainingSize());
1442  auto thread_count = get_thread_count(copy_params,
1443  file_reader_->isRemainingSizeKnown(),
1444  file_reader_->getRemainingSize(),
1445  buffer_size);
1446  multi_threading_params.continue_processing = true;
1447 
1448  std::vector<std::future<void>> futures{};
1449  for (size_t i = 0; i < thread_count; i++) {
1450  multi_threading_params.request_pool.emplace(buffer_size,
1451  copy_params,
1452  db_id_,
1454  columns_to_scan,
1456  disable_cache_);
1457 
1458  futures.emplace_back(std::async(std::launch::async,
1459  scan_metadata,
1460  std::ref(multi_threading_params),
1462  std::ref(parser)));
1463  }
1464 
1465  ResidualBuffer residual_buffer;
1467  buffer_size,
1468  file_path,
1469  (*file_reader_),
1470  copy_params,
1471  multi_threading_params,
1472  num_rows_,
1475  nullptr,
1476  residual_buffer,
1477  true);
1478 
1479  for (auto& future : futures) {
1480  // get() instead of wait() because we need to propagate potential exceptions.
1481  future.get();
1482  }
1483  }
1484 
1485  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
1486  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
1487  CHECK(column_entry != column_by_id.end());
1488  const auto& column_type = column_entry->second->columnType;
1489  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
1490  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
1491  const auto& cached_chunks = multi_threading_params.cached_chunks;
1492  if (!column_type.is_varlen_indeed()) {
1493  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
1494  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
1495  chunk_entry != cached_chunks.end()) {
1496  auto cached_buffer = chunk_entry->second.getBuffer();
1497  CHECK(cached_buffer);
1498  chunk_metadata->numBytes = cached_buffer->size();
1499  buffer->setSize(cached_buffer->size());
1500  } else {
1501  chunk_metadata->numBytes = buffer->size();
1502  }
1503  chunk_metadata_map_[chunk_key] = chunk_metadata;
1504  }
1505 
1506  for (auto column : columns) {
1507  if (skip_metadata_scan(column)) {
1509  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
1510  }
1511  }
1512 
1513  if (!rolled_off_files.empty()) {
1514  updateRolledOffChunks(rolled_off_files, column_by_id);
1515  }
1516 
1517  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1518  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1519  }
1520 
1521  // Save chunk data
1522  if (foreign_table_->isAppendMode()) {
1523  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
1524  }
1525 }
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void updateRolledOffChunks(const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
#define UNREACHABLE()
Definition: Logger.h:338
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
virtual const TextFileBufferParser & getFileBufferParser() const =0
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:343
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
virtual std::optional< size_t > getMaxFileCount() const
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
const ForeignServer * foreign_server
Definition: ForeignTable.h:57
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id,
AbstractBuffer delete_buffer 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks
delete_buffer- optional buffer to store deleted row indices

Definition at line 339 of file AbstractTextFileDataWrapper.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, db_id_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), Data_Namespace::AbstractBuffer::getMemoryPtr(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished(), is_file_scan_in_progress_, is_first_file_scan_call_, iterativeFileScan(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, multi_threading_params_, foreign_storage::OptionsContainer::options, foreign_storage::parse_file_regions(), run_benchmark_import::parser, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::resize_delete_buffer(), run_benchmark_import::result, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_fragment_id_out_of_bounds_error(), UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

Referenced by populateChunkBuffers().

342  {
343  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
344  CHECK(!column_id_to_chunk_map.empty());
345 
346  // check to see if a iterative scan step is required
347  auto file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
348  if (file_regions_it == fragment_id_to_file_regions_map_.end() ||
350  // check to see if there is more foreign data to scan
353  // NOTE: we can only guarantee the current `fragment_id` is fully done
354  // iterative scan if either
355  // 1) the scan is finished OR
356  // 2) `fragment_id+1` exists in the internal map
357  // this is why `fragment_id+1` is checked for below
358  auto file_regions_it_one_ahead =
359  fragment_id_to_file_regions_map_.find(fragment_id + 1);
361  (file_regions_it_one_ahead == fragment_id_to_file_regions_map_.end())) {
362  ChunkMetadataVector chunk_metadata_vector;
363  IterativeFileScanParameters iterative_params{
364  column_id_to_chunk_map, fragment_id, delete_buffer};
365  iterativeFileScan(chunk_metadata_vector, iterative_params);
366  }
367  }
368 
369  file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
370  if (file_regions_it == fragment_id_to_file_regions_map_.end()) {
372  is_file_scan_in_progress_ = false; // conclude the iterative scan is finished
374  true; // any subsequent iterative request can assume they will be the first
376  foreign_table_, fragment_id, fragment_id_to_file_regions_map_.rbegin()->first);
377  } else {
378  // iterative scan is required to have loaded all required chunks thus we
379  // can exit early
380  return;
381  }
382  }
383  CHECK(file_regions_it != fragment_id_to_file_regions_map_.end());
384 
385  const auto& file_regions = file_regions_it->second;
386 
387  // File roll off can lead to empty file regions.
388  if (file_regions.empty()) {
389  return;
390  }
391 
392  const auto buffer_size = get_buffer_size(file_regions);
393  const auto thread_count = get_thread_count(copy_params, file_regions);
394 
395  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
396 
397  std::vector<ParseBufferRequest> parse_file_requests{};
398  parse_file_requests.reserve(thread_count);
399  std::set<int> column_filter_set;
400  for (const auto& pair : column_id_to_chunk_map) {
401  column_filter_set.insert(pair.first);
402  }
403 
404  std::vector<std::unique_ptr<FileReader>> file_readers;
405  rapidjson::Value reader_metadata(rapidjson::kObjectType);
406  rapidjson::Document d;
407  auto& server_options = foreign_table_->foreign_server->options;
409  file_reader_->serialize(reader_metadata, d.GetAllocator());
410  const auto file_path = getFullFilePath(foreign_table_);
411  auto& parser = getFileBufferParser();
412  std::vector<size_t> start_indices, end_indices;
413 
414  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
415  parse_file_requests.emplace_back(buffer_size,
416  copy_params,
417  db_id_,
419  column_filter_set,
420  file_path,
421  delete_buffer != nullptr);
422  auto start_index = i;
423  start_indices.emplace_back(start_index);
424  end_indices.emplace_back(
425  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1));
426 
427  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
428  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
429  file_path, copy_params, reader_metadata));
430  } else {
431  UNREACHABLE();
432  }
433  }
434 
435  CHECK_EQ(start_indices.size(), file_readers.size());
436  CHECK_EQ(start_indices.size(), parse_file_requests.size());
437  CHECK_EQ(start_indices.size(), end_indices.size());
438 
439  // All objects that futures depend on and/or that are passed by reference
440  // should be defined and/or instantiated before the loop that creates the
441  // futures (below) to avoid possible issues related to exception propagation
442  std::vector<std::future<ParseFileRegionResult>> futures{};
443  for (size_t i = 0; i < file_readers.size(); i++) {
444  futures.emplace_back(std::async(std::launch::async,
446  std::ref(file_regions),
447  start_indices[i],
448  end_indices[i],
449  std::ref(*(file_readers[i])),
450  std::ref(parse_file_requests[i]),
451  std::ref(column_id_to_chunk_map),
452  std::ref(parser)));
453  }
454 
455  for (auto& future : futures) {
456  future.wait();
457  }
458 
459  std::vector<ParseFileRegionResult> load_file_region_results{};
460  for (auto& future : futures) {
461  load_file_region_results.emplace_back(future.get());
462  }
463 
464  std::set<size_t> chunk_rejected_row_indices;
465  size_t chunk_offset = 0;
466  for (auto result : load_file_region_results) {
467  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
468  chunk.appendData(
469  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
470  }
471  for (const auto& rejected_row_index : result.rejected_row_indices) {
472  chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
473  }
474  chunk_offset += result.row_count;
475  }
476 
477  if (delete_buffer) {
478  // ensure delete buffer is sized appropriately
479  resize_delete_buffer(delete_buffer, chunk_offset);
480 
481  auto delete_buffer_data = delete_buffer->getMemoryPtr();
482  for (const auto rejected_row_index : chunk_rejected_row_indices) {
483  delete_buffer_data[rejected_row_index] = true;
484  }
485  }
486 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void resize_delete_buffer(AbstractBuffer *delete_buffer, const size_t chunk_element_count)
virtual int8_t * getMemoryPtr()=0
#define UNREACHABLE()
Definition: Logger.h:338
virtual const TextFileBufferParser & getFileBufferParser() const =0
void iterativeFileScan(ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
MetadataScanMultiThreadingParams multi_threading_params_
future< Result > async(Fn &&fn, Args &&...args)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void throw_fragment_id_out_of_bounds_error(const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
const ForeignServer * foreign_server
Definition: ForeignTable.h:57
#define CHECK(condition)
Definition: Logger.h:291
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1712 of file AbstractTextFileDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_encoder_buffers_, chunk_metadata_map_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, json_utils::get_value_from_object(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, json_utils::read_from_file(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

1714  {
1715  auto d = json_utils::read_from_file(file_path);
1716  CHECK(d.IsObject());
1717 
1718  // Restore fragment map
1720  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1721 
1722  // Construct reader with metadta
1723  CHECK(d.HasMember("reader_metadata"));
1724  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1725  const auto full_file_path = getFullFilePath(foreign_table_);
1726  auto& server_options = foreign_table_->foreign_server->options;
1727  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1728  file_reader_ = std::make_unique<LocalMultiFileReader>(
1729  full_file_path, copy_params, d["reader_metadata"]);
1730  } else {
1731  UNREACHABLE();
1732  }
1733 
1735  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1736 
1737  // Now restore the internal metadata maps
1738  CHECK(chunk_metadata_map_.empty());
1739  CHECK(chunk_encoder_buffers_.empty());
1740 
1741  for (auto& pair : chunk_metadata) {
1742  chunk_metadata_map_[pair.first] = pair.second;
1743 
1744  if (foreign_table_->isAppendMode()) {
1745  // Restore encoder state for append mode
1746  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1747  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1748  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1749  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1750  pair.second->numElements);
1751  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1752  pair.second->chunkStats);
1753  chunk_encoder_buffers_[pair.first]->setUpdated();
1754  }
1755  }
1756  is_restored_ = true;
1757 }
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: JsonUtils.h:270
#define UNREACHABLE()
Definition: Logger.h:338
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
virtual const TextFileBufferParser & getFileBufferParser() const =0
rapidjson::Document read_from_file(const std::string &file_path)
Definition: JsonUtils.cpp:201
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
bool isAppendMode() const
Checks if the table is in append mode.
const ForeignServer * foreign_server
Definition: ForeignTable.h:57
#define CHECK(condition)
Definition: Logger.h:291
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::updateMetadata ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Definition at line 147 of file AbstractTextFileDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, shared::get_from_map(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

149  {
151  CHECK(catalog);
152  for (auto& entry : column_id_to_chunk_map) {
153  const auto& column =
154  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
155  if (skip_metadata_scan(column)) {
156  ChunkKey data_chunk_key = {
157  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
158  if (column->columnType.is_varlen_indeed()) {
159  data_chunk_key.emplace_back(1);
160  }
161  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
162  // Allocate new shared_ptr for metadata so we dont modify old one which may be
163  // used by executor
164  auto cached_metadata_previous =
165  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
166  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
167  std::make_shared<ChunkMetadata>();
168  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
169  *cached_metadata = *cached_metadata_previous;
170  auto chunk_metadata =
171  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
172  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
173  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
174  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
175  cached_metadata->numBytes = entry.second.getBuffer()->size();
176  }
177  }
178 }
std::vector< int > ChunkKey
Definition: types.h:36
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::updateRolledOffChunks ( const std::set< std::string > &  rolled_off_files,
const std::map< int32_t, const ColumnDescriptor * > &  column_by_id 
)
private

Definition at line 1645 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, chunk_metadata_map_, shared::contains(), fragment_id_to_file_regions_map_, shared::get_from_map(), and foreign_storage::get_placeholder_metadata().

Referenced by populateChunkMetadata().

1647  {
1648  std::set<int32_t> deleted_fragment_ids;
1649  std::optional<int32_t> partially_deleted_fragment_id;
1650  std::optional<size_t> partially_deleted_fragment_row_count;
1651  for (auto& [fragment_id, file_regions] : fragment_id_to_file_regions_map_) {
1652  bool file_region_deleted{false};
1653  for (auto it = file_regions.begin(); it != file_regions.end();) {
1654  if (shared::contains(rolled_off_files, it->file_path)) {
1655  it = file_regions.erase(it);
1656  file_region_deleted = true;
1657  } else {
1658  it++;
1659  }
1660  }
1661  if (file_regions.empty()) {
1662  deleted_fragment_ids.emplace(fragment_id);
1663  } else if (file_region_deleted) {
1664  partially_deleted_fragment_id = fragment_id;
1665  partially_deleted_fragment_row_count = 0;
1666  for (const auto& file_region : file_regions) {
1667  partially_deleted_fragment_row_count.value() += file_region.row_count;
1668  }
1669  break;
1670  }
1671  }
1672 
1673  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1674  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
1675  chunk_metadata->numElements = 0;
1676  chunk_metadata->numBytes = 0;
1677  } else if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
1678  CHECK(partially_deleted_fragment_row_count.has_value());
1679  auto old_chunk_stats = chunk_metadata->chunkStats;
1680  auto cd = shared::get_from_map(column_by_id, chunk_key[CHUNK_KEY_COLUMN_IDX]);
1681  chunk_metadata = get_placeholder_metadata(
1682  cd->columnType, partially_deleted_fragment_row_count.value());
1683  // Old chunk stats will still be correct (since only row deletion is occurring)
1684  // and more accurate than that of the placeholder metadata.
1685  chunk_metadata->chunkStats = old_chunk_stats;
1686  }
1687  }
1688 }
bool contains(const T &container, const U &element)
Definition: misc.h:195
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK(condition)
Definition: Logger.h:291
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

size_t foreign_storage::AbstractTextFileDataWrapper::append_start_offset_
private
size_t foreign_storage::AbstractTextFileDataWrapper::buffer_size_
private

Definition at line 198 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::AbstractTextFileDataWrapper::chunk_encoder_buffers_
private
std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::AbstractTextFileDataWrapper::chunk_metadata_map_
private
const int foreign_storage::AbstractTextFileDataWrapper::db_id_
private
const bool foreign_storage::AbstractTextFileDataWrapper::disable_cache_
private

Definition at line 188 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunkMetadata().

std::unique_ptr<FileReader> foreign_storage::AbstractTextFileDataWrapper::file_reader_
private
const ForeignTable* foreign_storage::AbstractTextFileDataWrapper::foreign_table_
private
std::map<int, FileRegions> foreign_storage::AbstractTextFileDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::AbstractTextFileDataWrapper::is_file_scan_in_progress_
private
bool foreign_storage::AbstractTextFileDataWrapper::is_first_file_scan_call_
private

Definition at line 190 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunks().

bool foreign_storage::AbstractTextFileDataWrapper::is_restored_
private

Definition at line 183 of file AbstractTextFileDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

int foreign_storage::AbstractTextFileDataWrapper::iterative_scan_last_fragment_id_
private

Definition at line 194 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

MetadataScanMultiThreadingParams foreign_storage::AbstractTextFileDataWrapper::multi_threading_params_
private

Definition at line 197 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunks().

size_t foreign_storage::AbstractTextFileDataWrapper::num_rows_
private
ResidualBuffer foreign_storage::AbstractTextFileDataWrapper::residual_buffer_
private

Definition at line 201 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

size_t foreign_storage::AbstractTextFileDataWrapper::thread_count_
private

Definition at line 199 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

const UserMapping* foreign_storage::AbstractTextFileDataWrapper::user_mapping_
private

Definition at line 185 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunkMetadata().


The documentation for this class was generated from the following files: