OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::AbstractTextFileDataWrapper Class Referenceabstract

#include <AbstractTextFileDataWrapper.h>

+ Inheritance diagram for foreign_storage::AbstractTextFileDataWrapper:
+ Collaboration diagram for foreign_storage::AbstractTextFileDataWrapper:

Classes

struct  ResidualBuffer
 

Public Member Functions

 AbstractTextFileDataWrapper ()
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
 AbstractTextFileDataWrapper (const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping, const bool disable_cache)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
void createRenderGroupAnalyzers () override
 Create RenderGroupAnalyzers for poly columns. More...
 
bool isLazyFragmentFetchingEnabled () const override
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
const std::set< std::string > getAlterableTableOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Protected Member Functions

virtual const
TextFileBufferParser
getFileBufferParser () const =0
 
virtual std::optional< size_t > getMaxFileCount () const
 

Private Member Functions

 AbstractTextFileDataWrapper (const ForeignTable *foreign_table)
 
void iterativeFileScan (ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
 
void populateChunks (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
 
void populateChunkMapForColumns (const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
 
void updateMetadata (std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
 
void updateRolledOffChunks (const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
 

Private Attributes

std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
std::map< int, FileRegionsfragment_id_to_file_regions_map_
 
std::unique_ptr< FileReaderfile_reader_
 
const int db_id_
 
const ForeignTableforeign_table_
 
std::map< ChunkKey,
std::unique_ptr
< ForeignStorageBuffer > > 
chunk_encoder_buffers_
 
size_t num_rows_
 
size_t append_start_offset_
 
bool is_restored_
 
const UserMappinguser_mapping_
 
const bool disable_cache_
 
bool is_first_file_scan_call_
 
bool is_file_scan_in_progress_
 
RenderGroupAnalyzerMap render_group_analyzer_map_
 
MetadataScanMultiThreadingParams multi_threading_params_
 
size_t buffer_size_
 
size_t thread_count_
 
ResidualBuffer residual_buffer_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static shared::FilePathOptions getFilePathOptions (const ForeignTable *foreign_table)
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::string ALLOW_FILE_ROLL_OFF_KEY = "ALLOW_FILE_ROLL_OFF"
 
static const std::string THREADS_KEY = "THREADS"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 
static bool allowFileRollOff (const ForeignTable *foreign_table)
 

Detailed Description

Definition at line 92 of file AbstractTextFileDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const int  db_id,
const ForeignTable foreign_table,
const UserMapping user_mapping,
const bool  disable_cache 
)
foreign_storage::AbstractTextFileDataWrapper::AbstractTextFileDataWrapper ( const ForeignTable foreign_table)
private

Member Function Documentation

void foreign_storage::AbstractTextFileDataWrapper::createRenderGroupAnalyzers ( )
overridevirtual

Create RenderGroupAnalyzers for poly columns.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 1633 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHECK_GE, db_id_, foreign_table_, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), IS_GEO_POLY, render_group_analyzer_map_, and TableDescriptor::tableId.

1633  {
1634  // must have these
1635  CHECK_GE(db_id_, 0);
1637 
1638  // populate map for all poly columns in this table
1640  CHECK(catalog);
1641  auto columns =
1642  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1643  for (auto const& column : columns) {
1644  if (IS_GEO_POLY(column->columnType.get_type())) {
1646  .try_emplace(column->columnId,
1647  std::make_unique<import_export::RenderGroupAnalyzer>())
1648  .second);
1649  }
1650  }
1651 }
#define CHECK_GE(x, y)
Definition: Logger.h:235
static SysCatalog & instance()
Definition: SysCatalog.h:341
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:222
#define IS_GEO_POLY(T)
Definition: sqltypes.h:328

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 115 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

virtual const TextFileBufferParser& foreign_storage::AbstractTextFileDataWrapper::getFileBufferParser ( ) const
protectedpure virtual

Implemented in foreign_storage::CsvDataWrapper, foreign_storage::RegexParserDataWrapper, and foreign_storage::InternalLogsDataWrapper.

Referenced by iterativeFileScan(), populateChunkMetadata(), populateChunks(), and restoreDataWrapperInternals().

+ Here is the caller graph for this function:

std::optional< size_t > foreign_storage::AbstractTextFileDataWrapper::getMaxFileCount ( ) const
protectedvirtual

Reimplemented in foreign_storage::InternalLogsDataWrapper.

Definition at line 1653 of file AbstractTextFileDataWrapper.cpp.

Referenced by iterativeFileScan(), and populateChunkMetadata().

1653  {
1654  return {};
1655 }

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::AbstractTextFileDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 117 of file AbstractTextFileDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::string foreign_storage::AbstractTextFileDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1557 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), append_start_offset_, file_reader_, fragment_id_to_file_regions_map_, num_rows_, and foreign_storage::json_utils::write_to_string().

1557  {
1558  rapidjson::Document d;
1559  d.SetObject();
1560 
1561  // Save fragment map
1564  "fragment_id_to_file_regions_map",
1565  d.GetAllocator());
1566 
1567  // Save reader metadata
1568  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1569  file_reader_->serialize(reader_metadata, d.GetAllocator());
1570  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1571 
1572  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1574  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1575 
1576  return json_utils::write_to_string(d);
1577 }
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:157
std::string write_to_string(const rapidjson::Document &document)

+ Here is the call graph for this function:

bool foreign_storage::AbstractTextFileDataWrapper::isLazyFragmentFetchingEnabled ( ) const
inlineoverridevirtual

If true data wrapper implements a lazy fragment fetching mode. This mode allows requests for fragments to be issued to populateChunks without the prerequisite that populateChunkMetadata has successfully finished execution. This is an optimization that has some specific use-cases and is not required.

NOTE: this mode is not guaranteed to work as expected when combined with certain types of refresh modes such as append. This is subject to change in the future, but has no impact on the intended use-cases of this mode.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 123 of file AbstractTextFileDataWrapper.h.

123 { return true; }
bool foreign_storage::AbstractTextFileDataWrapper::isRestored ( ) const
overridevirtual
void foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan ( ChunkMetadataVector chunk_metadata_vector,
IterativeFileScanParameters file_scan_param 
)
private

Implements an iterative file scan that enables populating chunks fragment by fragment.

Definition at line 1395 of file AbstractTextFileDataWrapper.cpp.

References append_start_offset_, threading_serial::async(), buffer_size_, CHECK, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, db_id_, DEBUG_TIMER, foreign_storage::MetadataScanMultiThreadingParams::disable_cache, disable_cache_, foreign_storage::dispatch_scan_requests_with_exception_handling(), file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFilePathOptions(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), getMaxFileCount(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished(), is_file_scan_in_progress_, is_first_file_scan_call_, foreign_storage::ForeignTable::isAppendMode(), multi_threading_params_, num_rows_, foreign_storage::OptionsContainer::options, run_benchmark_import::parser, foreign_storage::populate_chunks(), render_group_analyzer_map_, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::reset_multithreading_params(), residual_buffer_, TableDescriptor::tableId, thread_count_, user_mapping_, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

Referenced by populateChunks().

1397  {
1398  auto timer = DEBUG_TIMER(__func__);
1399 
1401 
1403  << " iterative file scan can not be used with APPEND mode.";
1404 
1405  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1406  const auto file_path = getFullFilePath(foreign_table_);
1408  CHECK(catalog);
1409  auto& parser = getFileBufferParser();
1410  const auto file_path_options = getFilePathOptions(foreign_table_);
1411  auto& server_options = foreign_table_->foreign_server->options;
1412 
1417  server_options,
1418  file_reader_,
1419  file_path,
1420  copy_params,
1421  file_path_options,
1422  getMaxFileCount(),
1424  user_mapping_,
1425  parser,
1426  [this] { return ""; },
1427  num_rows_,
1429  }
1430 
1431  auto columns =
1432  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1433  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1434  for (auto column : columns) {
1435  column_by_id[column->columnId] = column;
1436  }
1437 
1438  if (is_first_file_scan_call_) { // reiniitialize all members that may have state in
1439  // `multi_threading_params_`
1441  }
1442 
1444 
1445  std::set<int> columns_to_scan;
1446  for (auto column : columns) {
1447  columns_to_scan.insert(column->columnId);
1448  }
1449 
1452  // NOTE: `buffer_size_` and `thread_count_` must not change across an iterative
1453  // scan
1454  buffer_size_ = get_buffer_size(copy_params,
1455  file_reader_->isRemainingSizeKnown(),
1456  file_reader_->getRemainingSize());
1457  thread_count_ = get_thread_count(copy_params,
1458  file_reader_->isRemainingSizeKnown(),
1459  file_reader_->getRemainingSize(),
1460  buffer_size_);
1461  }
1463 
1464  std::vector<std::future<void>> futures{};
1465  for (size_t i = 0; i < thread_count_; i++) {
1468  copy_params,
1469  db_id_,
1471  columns_to_scan,
1474  true);
1475  }
1476  futures.emplace_back(std::async(std::launch::async,
1478  std::ref(multi_threading_params_),
1480  std::ref(parser),
1481  std::ref(file_scan_param)));
1482  }
1483 
1485  buffer_size_,
1486  file_path,
1487  (*file_reader_),
1488  copy_params,
1490  num_rows_,
1493  &file_scan_param,
1496 
1497  for (auto& future : futures) {
1498  // get() instead of wait() because we need to propagate potential exceptions.
1499  future.get();
1500  }
1501  }
1502 
1504  is_first_file_scan_call_ = false;
1505  }
1506 
1509  }
1510 }
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void reset_multithreading_params(foreign_storage::MetadataScanMultiThreadingParams &multi_threading_params)
virtual const TextFileBufferParser & getFileBufferParser() const =0
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
MetadataScanMultiThreadingParams multi_threading_params_
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:341
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void populate_chunks(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser, foreign_storage::IterativeFileScanParameters &file_scan_param)
virtual std::optional< size_t > getMaxFileCount() const
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 116 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, db_id_, DEBUG_TIMER, foreign_table_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_columns(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), is_file_scan_in_progress_, populateChunkMapForColumns(), populateChunks(), TableDescriptor::tableId, and updateMetadata().

119  {
120  auto timer = DEBUG_TIMER(__func__);
122  CHECK(catalog);
123  CHECK(!required_buffers.empty());
124 
125  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
126  auto required_columns =
127  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
128  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
130  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
131 
132  if (!optional_buffers.empty()) {
133  auto optional_columns =
134  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
136  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
137  }
138  populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
140  updateMetadata(column_id_to_chunk_map, fragment_id);
141  }
142 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
static SysCatalog & instance()
Definition: SysCatalog.h:341
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMapForColumns ( const std::set< const ColumnDescriptor * > &  columns,
const int  fragment_id,
const ChunkToBufferMap buffers,
std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map 
)
private

Definition at line 101 of file AbstractTextFileDataWrapper.cpp.

References chunk_metadata_map_, db_id_, foreign_table_, foreign_storage::init_chunk_for_column(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

105  {
106  for (const auto column : columns) {
107  ChunkKey data_chunk_key = {
108  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
109  init_chunk_for_column(data_chunk_key,
111  buffers,
112  column_id_to_chunk_map[column->columnId]);
113  }
114 }
std::vector< int > ChunkKey
Definition: types.h:36
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates provided chunk metadata vector with metadata for table specified in given chunk key. Metadata scan for text file(s) configured for foreign table occurs in parallel whenever appropriate. Parallel processing involves the main thread creating ParseBufferRequest objects, which contain buffers with text content read from file and adding these request objects to a queue that is consumed by a fixed number of threads. After request processing, request objects are put back into a pool for reuse for subsequent requests in order to avoid unnecessary allocation of new buffers.

Parameters
chunk_metadata_vector- vector to be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1242 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::add_placeholder_metadata(), foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), append_start_offset_, threading_serial::async(), foreign_storage::MetadataScanMultiThreadingParams::cached_chunks, CHECK, CHECK_EQ, foreign_storage::MultiFileReader::checkForRolledOffFiles(), foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, chunk_encoder_buffers_, CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, foreign_storage::MetadataScanMultiThreadingParams::continue_processing, db_id_, DEBUG_TIMER, foreign_storage::MetadataScanMultiThreadingParams::disable_cache, disable_cache_, foreign_storage::dispatch_scan_requests_with_exception_handling(), file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), Catalog_Namespace::SysCatalog::getCatalog(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFilePathOptions(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), getMaxFileCount(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, run_benchmark_import::parser, foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::scan_metadata(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, TableDescriptor::tableId, UNREACHABLE, updateRolledOffChunks(), user_mapping_, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

1243  {
1244  auto timer = DEBUG_TIMER(__func__);
1245 
1246  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1247  const auto file_path = getFullFilePath(foreign_table_);
1249  CHECK(catalog);
1250  auto& parser = getFileBufferParser();
1251  const auto file_path_options = getFilePathOptions(foreign_table_);
1252  auto& server_options = foreign_table_->foreign_server->options;
1253  std::set<std::string> rolled_off_files;
1254  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
1255  auto multi_file_reader = dynamic_cast<MultiFileReader*>(file_reader_.get());
1256  if (allowFileRollOff(foreign_table_) && multi_file_reader) {
1257  rolled_off_files = multi_file_reader->checkForRolledOffFiles(file_path_options);
1258  }
1259  parser.validateFiles(file_reader_.get(), foreign_table_);
1260  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1261  file_reader_->checkForMoreRows(append_start_offset_, file_path_options);
1262  } else {
1263  UNREACHABLE();
1264  }
1265  } else {
1269  server_options,
1270  file_reader_,
1271  file_path,
1272  copy_params,
1273  file_path_options,
1274  getMaxFileCount(),
1276  user_mapping_,
1277  parser,
1278  [this] { return ""; },
1279  num_rows_,
1281  }
1282 
1283  auto columns =
1284  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1285  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1286  for (auto column : columns) {
1287  column_by_id[column->columnId] = column;
1288  }
1289  MetadataScanMultiThreadingParams multi_threading_params;
1290  multi_threading_params.disable_cache = disable_cache_;
1291 
1292  // Restore previous chunk data
1293  if (foreign_table_->isAppendMode()) {
1294  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
1295  }
1296 
1297  std::set<int> columns_to_scan;
1298  for (auto column : columns) {
1299  if (!skip_metadata_scan(column)) {
1300  columns_to_scan.insert(column->columnId);
1301  }
1302  }
1303 
1304  // Track where scan started for appends
1305  int start_row = num_rows_;
1306  if (!file_reader_->isScanFinished()) {
1307  auto buffer_size = get_buffer_size(copy_params,
1308  file_reader_->isRemainingSizeKnown(),
1309  file_reader_->getRemainingSize());
1310  auto thread_count = get_thread_count(copy_params,
1311  file_reader_->isRemainingSizeKnown(),
1312  file_reader_->getRemainingSize(),
1313  buffer_size);
1314  multi_threading_params.continue_processing = true;
1315 
1316  std::vector<std::future<void>> futures{};
1317  for (size_t i = 0; i < thread_count; i++) {
1318  multi_threading_params.request_pool.emplace(buffer_size,
1319  copy_params,
1320  db_id_,
1322  columns_to_scan,
1324  nullptr,
1325  disable_cache_);
1326 
1327  futures.emplace_back(std::async(std::launch::async,
1328  scan_metadata,
1329  std::ref(multi_threading_params),
1331  std::ref(parser)));
1332  }
1333 
1334  ResidualBuffer residual_buffer;
1336  buffer_size,
1337  file_path,
1338  (*file_reader_),
1339  copy_params,
1340  multi_threading_params,
1341  num_rows_,
1344  nullptr,
1345  residual_buffer,
1346  true);
1347 
1348  for (auto& future : futures) {
1349  // get() instead of wait() because we need to propagate potential exceptions.
1350  future.get();
1351  }
1352  }
1353 
1354  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
1355  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
1356  CHECK(column_entry != column_by_id.end());
1357  const auto& column_type = column_entry->second->columnType;
1358  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
1359  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
1360  const auto& cached_chunks = multi_threading_params.cached_chunks;
1361  if (!column_type.is_varlen_indeed()) {
1362  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
1363  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
1364  chunk_entry != cached_chunks.end()) {
1365  auto buffer = chunk_entry->second.getBuffer();
1366  CHECK(buffer);
1367  chunk_metadata->numBytes = buffer->size();
1368  } else {
1369  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
1370  }
1371  chunk_metadata_map_[chunk_key] = chunk_metadata;
1372  }
1373 
1374  for (auto column : columns) {
1375  if (skip_metadata_scan(column)) {
1377  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
1378  }
1379  }
1380 
1381  if (!rolled_off_files.empty()) {
1382  updateRolledOffChunks(rolled_off_files, column_by_id);
1383  }
1384 
1385  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1386  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1387  }
1388 
1389  // Save chunk data
1390  if (foreign_table_->isAppendMode()) {
1391  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
1392  }
1393 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void updateRolledOffChunks(const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
#define UNREACHABLE()
Definition: Logger.h:266
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
virtual const TextFileBufferParser & getFileBufferParser() const =0
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:341
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
virtual std::optional< size_t > getMaxFileCount() const
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::populateChunks ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id,
AbstractBuffer delete_buffer 
)
private

Populates provided chunks with appropriate data by parsing all file regions containing chunk data.

Parameters
column_id_to_chunk_map- map of column id to chunks to be populated
fragment_id- fragment id of given chunks
delete_buffer- optional buffer to store deleted row indices

Definition at line 336 of file AbstractTextFileDataWrapper.cpp.

References threading_serial::async(), CHECK, db_id_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), Data_Namespace::AbstractBuffer::getMemoryPtr(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished(), is_file_scan_in_progress_, is_first_file_scan_call_, iterativeFileScan(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, multi_threading_params_, foreign_storage::OptionsContainer::options, foreign_storage::parse_file_regions(), run_benchmark_import::parser, render_group_analyzer_map_, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::resize_delete_buffer(), run_benchmark_import::result, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_fragment_id_out_of_bounds_error(), UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

Referenced by populateChunkBuffers().

339  {
340  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
341 
342  CHECK(!column_id_to_chunk_map.empty());
343 
344  // check to see if a iterative scan step is required
345  auto file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
346  if (file_regions_it == fragment_id_to_file_regions_map_.end() ||
348  // check to see if there is more foreign data to scan
351  // NOTE: we can only guarantee the current `fragment_id` is fully done
352  // iterative scan if either
353  // 1) the scan is finished OR
354  // 2) `fragment_id+1` exists in the internal map
355  // this is why `fragment_id+1` is checked for below
356  auto file_regions_it_one_ahead =
357  fragment_id_to_file_regions_map_.find(fragment_id + 1);
359  (file_regions_it_one_ahead == fragment_id_to_file_regions_map_.end())) {
360  ChunkMetadataVector chunk_metadata_vector;
361  IterativeFileScanParameters iterative_params{
362  column_id_to_chunk_map, fragment_id, delete_buffer};
363  iterativeFileScan(chunk_metadata_vector, iterative_params);
364  }
365  }
366 
367  file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
368  if (file_regions_it == fragment_id_to_file_regions_map_.end()) {
370  is_file_scan_in_progress_ = false; // conclude the iterative scan is finished
372  true; // any subsequent iterative request can assume they will be the first
374  foreign_table_, fragment_id, fragment_id_to_file_regions_map_.rbegin()->first);
375  } else {
376  // iterative scan is required to have loaded all required chunks thus we
377  // can exit early
378  return;
379  }
380  }
381  CHECK(file_regions_it != fragment_id_to_file_regions_map_.end());
382 
383  const auto& file_regions = file_regions_it->second;
384 
385  // File roll off can lead to empty file regions.
386  if (file_regions.empty()) {
387  return;
388  }
389 
390  const auto buffer_size = get_buffer_size(file_regions);
391  const auto thread_count = get_thread_count(copy_params, file_regions);
392 
393  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
394 
395  std::vector<ParseBufferRequest> parse_file_requests{};
396  parse_file_requests.reserve(thread_count);
397  std::vector<std::future<ParseFileRegionResult>> futures{};
398  std::set<int> column_filter_set;
399  for (const auto& pair : column_id_to_chunk_map) {
400  column_filter_set.insert(pair.first);
401  }
402 
403  std::vector<std::unique_ptr<FileReader>> file_readers;
404  rapidjson::Value reader_metadata(rapidjson::kObjectType);
405  rapidjson::Document d;
406  auto& server_options = foreign_table_->foreign_server->options;
407  file_reader_->serialize(reader_metadata, d.GetAllocator());
408  const auto file_path = getFullFilePath(foreign_table_);
409  auto& parser = getFileBufferParser();
410 
411  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
412  parse_file_requests.emplace_back(buffer_size,
413  copy_params,
414  db_id_,
416  column_filter_set,
417  file_path,
419  delete_buffer != nullptr);
420  auto start_index = i;
421  auto end_index =
422  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
423 
424  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
425  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
426  file_path, copy_params, reader_metadata));
427  } else {
428  UNREACHABLE();
429  }
430 
431  futures.emplace_back(std::async(std::launch::async,
433  std::ref(file_regions),
434  start_index,
435  end_index,
436  std::ref(*(file_readers.back())),
437  std::ref(parse_file_requests.back()),
438  std::ref(column_id_to_chunk_map),
439  std::ref(parser)));
440  }
441 
442  for (auto& future : futures) {
443  future.wait();
444  }
445 
446  std::vector<ParseFileRegionResult> load_file_region_results{};
447  for (auto& future : futures) {
448  load_file_region_results.emplace_back(future.get());
449  }
450 
451  std::set<size_t> chunk_rejected_row_indices;
452  size_t chunk_offset = 0;
453  for (auto result : load_file_region_results) {
454  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
455  chunk.appendData(
456  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
457  }
458  for (const auto& rejected_row_index : result.rejected_row_indices) {
459  chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
460  }
461  chunk_offset += result.row_count;
462  }
463 
464  if (delete_buffer) {
465  // ensure delete buffer is sized appropriately
466  resize_delete_buffer(delete_buffer, chunk_offset);
467 
468  auto delete_buffer_data = delete_buffer->getMemoryPtr();
469  for (const auto rejected_row_index : chunk_rejected_row_indices) {
470  delete_buffer_data[rejected_row_index] = true;
471  }
472  }
473 }
void resize_delete_buffer(AbstractBuffer *delete_buffer, const size_t chunk_element_count)
virtual int8_t * getMemoryPtr()=0
#define UNREACHABLE()
Definition: Logger.h:266
virtual const TextFileBufferParser & getFileBufferParser() const =0
void iterativeFileScan(ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
MetadataScanMultiThreadingParams multi_threading_params_
future< Result > async(Fn &&fn, Args &&...args)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void throw_fragment_id_out_of_bounds_error(const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:222
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 1579 of file AbstractTextFileDataWrapper.cpp.

References append_start_offset_, CHECK, chunk_encoder_buffers_, chunk_metadata_map_, file_reader_, foreign_storage::ForeignTable::foreign_server, foreign_table_, fragment_id_to_file_regions_map_, foreign_storage::json_utils::get_value_from_object(), getFileBufferParser(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), is_restored_, foreign_storage::ForeignTable::isAppendMode(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, num_rows_, foreign_storage::OptionsContainer::options, foreign_storage::json_utils::read_from_file(), foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateAndGetCopyParams().

1581  {
1582  auto d = json_utils::read_from_file(file_path);
1583  CHECK(d.IsObject());
1584 
1585  // Restore fragment map
1587  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1588 
1589  // Construct reader with metadta
1590  CHECK(d.HasMember("reader_metadata"));
1591  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1592  const auto full_file_path = getFullFilePath(foreign_table_);
1593  auto& server_options = foreign_table_->foreign_server->options;
1594  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1595  file_reader_ = std::make_unique<LocalMultiFileReader>(
1596  full_file_path, copy_params, d["reader_metadata"]);
1597  } else {
1598  UNREACHABLE();
1599  }
1600 
1602  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1603 
1604  // Now restore the internal metadata maps
1605  CHECK(chunk_metadata_map_.empty());
1606  CHECK(chunk_encoder_buffers_.empty());
1607 
1608  for (auto& pair : chunk_metadata) {
1609  chunk_metadata_map_[pair.first] = pair.second;
1610 
1611  if (foreign_table_->isAppendMode()) {
1612  // Restore encoder state for append mode
1613  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1614  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1615  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1616  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1617  pair.second->numElements);
1618  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1619  pair.second->chunkStats);
1620  chunk_encoder_buffers_[pair.first]->setUpdated();
1621  }
1622  }
1623  is_restored_ = true;
1624 }
#define UNREACHABLE()
Definition: Logger.h:266
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
virtual const TextFileBufferParser & getFileBufferParser() const =0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:172
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
rapidjson::Document read_from_file(const std::string &file_path)
bool isAppendMode() const
Checks if the table is in append mode.
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define CHECK(condition)
Definition: Logger.h:222
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::updateMetadata ( std::map< int, Chunk_NS::Chunk > &  column_id_to_chunk_map,
int  fragment_id 
)
private

Definition at line 145 of file AbstractTextFileDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, shared::get_from_map(), Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan(), and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

147  {
149  CHECK(catalog);
150  for (auto& entry : column_id_to_chunk_map) {
151  const auto& column =
152  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
153  if (skip_metadata_scan(column)) {
154  ChunkKey data_chunk_key = {
155  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
156  if (column->columnType.is_varlen_indeed()) {
157  data_chunk_key.emplace_back(1);
158  }
159  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
160  // Allocate new shared_ptr for metadata so we dont modify old one which may be
161  // used by executor
162  auto cached_metadata_previous =
163  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
164  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
165  std::make_shared<ChunkMetadata>();
166  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
167  *cached_metadata = *cached_metadata_previous;
168  auto chunk_metadata =
169  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
170  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
171  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
172  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
173  cached_metadata->numBytes = entry.second.getBuffer()->size();
174  }
175  }
176 }
std::vector< int > ChunkKey
Definition: types.h:36
static SysCatalog & instance()
Definition: SysCatalog.h:341
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::AbstractTextFileDataWrapper::updateRolledOffChunks ( const std::set< std::string > &  rolled_off_files,
const std::map< int32_t, const ColumnDescriptor * > &  column_by_id 
)
private

Definition at line 1512 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, chunk_metadata_map_, shared::contains(), fragment_id_to_file_regions_map_, shared::get_from_map(), and foreign_storage::get_placeholder_metadata().

Referenced by populateChunkMetadata().

1514  {
1515  std::set<int32_t> deleted_fragment_ids;
1516  std::optional<int32_t> partially_deleted_fragment_id;
1517  std::optional<size_t> partially_deleted_fragment_row_count;
1518  for (auto& [fragment_id, file_regions] : fragment_id_to_file_regions_map_) {
1519  bool file_region_deleted{false};
1520  for (auto it = file_regions.begin(); it != file_regions.end();) {
1521  if (shared::contains(rolled_off_files, it->file_path)) {
1522  it = file_regions.erase(it);
1523  file_region_deleted = true;
1524  } else {
1525  it++;
1526  }
1527  }
1528  if (file_regions.empty()) {
1529  deleted_fragment_ids.emplace(fragment_id);
1530  } else if (file_region_deleted) {
1531  partially_deleted_fragment_id = fragment_id;
1532  partially_deleted_fragment_row_count = 0;
1533  for (const auto& file_region : file_regions) {
1534  partially_deleted_fragment_row_count.value() += file_region.row_count;
1535  }
1536  break;
1537  }
1538  }
1539 
1540  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1541  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
1542  chunk_metadata->numElements = 0;
1543  chunk_metadata->numBytes = 0;
1544  } else if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
1545  CHECK(partially_deleted_fragment_row_count.has_value());
1546  auto old_chunk_stats = chunk_metadata->chunkStats;
1547  auto cd = shared::get_from_map(column_by_id, chunk_key[CHUNK_KEY_COLUMN_IDX]);
1548  chunk_metadata = get_placeholder_metadata(
1549  cd->columnType, partially_deleted_fragment_row_count.value());
1550  // Old chunk stats will still be correct (since only row deletion is occurring)
1551  // and more accurate than that of the placeholder metadata.
1552  chunk_metadata->chunkStats = old_chunk_stats;
1553  }
1554  }
1555 }
bool contains(const T &container, const U &element)
Definition: misc.h:195
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK(condition)
Definition: Logger.h:222
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

size_t foreign_storage::AbstractTextFileDataWrapper::append_start_offset_
private
size_t foreign_storage::AbstractTextFileDataWrapper::buffer_size_
private

Definition at line 202 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer> > foreign_storage::AbstractTextFileDataWrapper::chunk_encoder_buffers_
private
std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::AbstractTextFileDataWrapper::chunk_metadata_map_
private
const int foreign_storage::AbstractTextFileDataWrapper::db_id_
private
const bool foreign_storage::AbstractTextFileDataWrapper::disable_cache_
private

Definition at line 190 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunkMetadata().

std::unique_ptr<FileReader> foreign_storage::AbstractTextFileDataWrapper::file_reader_
private
std::map<int, FileRegions> foreign_storage::AbstractTextFileDataWrapper::fragment_id_to_file_regions_map_
private
bool foreign_storage::AbstractTextFileDataWrapper::is_file_scan_in_progress_
private
bool foreign_storage::AbstractTextFileDataWrapper::is_first_file_scan_call_
private

Definition at line 192 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunks().

bool foreign_storage::AbstractTextFileDataWrapper::is_restored_
private

Definition at line 185 of file AbstractTextFileDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

MetadataScanMultiThreadingParams foreign_storage::AbstractTextFileDataWrapper::multi_threading_params_
private

Definition at line 201 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunks().

size_t foreign_storage::AbstractTextFileDataWrapper::num_rows_
private
RenderGroupAnalyzerMap foreign_storage::AbstractTextFileDataWrapper::render_group_analyzer_map_
private
ResidualBuffer foreign_storage::AbstractTextFileDataWrapper::residual_buffer_
private

Definition at line 205 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

size_t foreign_storage::AbstractTextFileDataWrapper::thread_count_
private

Definition at line 203 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan().

const UserMapping* foreign_storage::AbstractTextFileDataWrapper::user_mapping_
private

Definition at line 187 of file AbstractTextFileDataWrapper.h.

Referenced by iterativeFileScan(), and populateChunkMetadata().


The documentation for this class was generated from the following files: