OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::ParquetDataWrapper Class Reference

#include <ParquetDataWrapper.h>

+ Inheritance diagram for foreign_storage::ParquetDataWrapper:
+ Collaboration diagram for foreign_storage::ParquetDataWrapper:

Public Member Functions

 ParquetDataWrapper ()
 
 ParquetDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
 
void serializeDataWrapperInternals (const std::string &file_path) const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Private Member Functions

std::list< const
ColumnDescriptor * > 
getColumnsToInitialize (const Interval< ColumnType > &column_interval)
 
void initializeChunkBuffers (const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
 
void fetchChunkMetadata ()
 
void loadBuffersUsingLazyParquetChunkLoader (const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers)
 
std::set< std::string > getProcessedFilePaths ()
 
std::set< std::string > getAllFilePaths ()
 
bool moveToNextFragment (size_t new_rows_count) const
 
void finalizeFragmentMap ()
 
void addNewFragment (int row_group, const std::string &file_path)
 
bool isNewFile (const std::string &file_path) const
 
void addNewFile (const std::string &file_path)
 
void resetParquetMetadata ()
 
void metadataScanFiles (const std::set< std::string > &file_paths)
 

Private Attributes

std::map< int, std::vector
< RowGroupInterval > > 
fragment_to_row_group_interval_map_
 
std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
const int db_id_
 
const ForeignTableforeign_table_
 
int last_fragment_index_
 
size_t last_fragment_row_count_
 
size_t total_row_count_
 
int last_row_group_
 
bool is_restored_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
std::unique_ptr< FileReaderMapfile_reader_cache_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 

Detailed Description

Definition at line 35 of file ParquetDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( )

Definition at line 73 of file ParquetDataWrapper.cpp.

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)

Definition at line 75 of file ParquetDataWrapper.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

76  : db_id_(db_id)
77  , foreign_table_(foreign_table)
80  , total_row_count_(0)
81  , last_row_group_(0)
82  , is_restored_(false)
83  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
84  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
85  auto& server_options = foreign_table->foreign_server->options;
86  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
87  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
88  } else {
89  UNREACHABLE();
90  }
91 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:241
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

void foreign_storage::ParquetDataWrapper::addNewFile ( const std::string &  file_path)
private

Definition at line 204 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanFiles().

204  {
205  const auto last_fragment_entry =
207  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
208 
209  // The entry for the first fragment starts out as an empty vector
210  if (last_fragment_entry->second.empty()) {
212  } else {
213  last_fragment_entry->second.back().end_index = last_row_group_;
214  }
215  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
216 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::addNewFragment ( int  row_group,
const std::string &  file_path 
)
private

Definition at line 178 of file ParquetDataWrapper.cpp.

References CHECK, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, and last_row_group_.

Referenced by metadataScanFiles().

178  {
179  const auto last_fragment_entry =
181  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
182 
183  last_fragment_entry->second.back().end_index = last_row_group_;
187  RowGroupInterval{file_path, row_group});
188 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::fetchChunkMetadata ( )
private

Definition at line 218 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), Catalog_Namespace::SysCatalog::getCatalog(), getProcessedFilePaths(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), metadataScanFiles(), resetParquetMetadata(), foreign_storage::throw_removed_file_error(), foreign_storage::throw_removed_row_error(), and total_row_count_.

Referenced by populateChunkMetadata().

218  {
220  CHECK(catalog);
221  std::set<std::string> new_file_paths;
222  auto processed_file_paths = getProcessedFilePaths();
223  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
224  auto all_file_paths = getAllFilePaths();
225  for (const auto& file_path : processed_file_paths) {
226  if (all_file_paths.find(file_path) == all_file_paths.end()) {
227  throw_removed_file_error(file_path);
228  }
229  }
230 
231  for (const auto& file_path : all_file_paths) {
232  if (processed_file_paths.find(file_path) == processed_file_paths.end()) {
233  new_file_paths.emplace(file_path);
234  }
235  }
236 
237  // Single file append
238  // If an append occurs with multiple files, then we assume any existing files have not
239  // been altered. If an append occurs on a single file, then we check to see if it has
240  // changed.
241  if (new_file_paths.empty() && all_file_paths.size() == 1) {
242  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
243  const auto& file_path = *all_file_paths.begin();
244  CHECK_EQ(*processed_file_paths.begin(), file_path);
245 
246  // Since an existing file is being appended to we need to update the cached
247  // FileReader as the existing one will be out of date.
248  auto reader = file_reader_cache_->insert(file_path, file_system_);
249  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
250 
251  if (row_count < total_row_count_) {
252  throw_removed_row_error(file_path);
253  } else if (row_count > total_row_count_) {
254  new_file_paths = all_file_paths;
255  chunk_metadata_map_.clear();
257  }
258  }
259  } else {
260  new_file_paths = getAllFilePaths();
261  chunk_metadata_map_.clear();
263  }
264 
265  if (!new_file_paths.empty()) {
266  metadataScanFiles(new_file_paths);
267  }
268 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
std::set< std::string > getAllFilePaths()
void throw_removed_row_error(const std::string &file_path)
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static SysCatalog & instance()
Definition: SysCatalog.h:292
void metadataScanFiles(const std::set< std::string > &file_paths)
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::finalizeFragmentMap ( )
private

Definition at line 173 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanFiles().

173  {
176 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::set< std::string > foreign_storage::ParquetDataWrapper::getAllFilePaths ( )
private

Definition at line 280 of file ParquetDataWrapper.cpp.

References DEBUG_TIMER, file_system_, foreign_table_, and foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath().

Referenced by fetchChunkMetadata().

280  {
281  auto timer = DEBUG_TIMER(__func__);
282  std::set<std::string> file_paths;
283  arrow::fs::FileSelector file_selector{};
284  std::string base_path = getFullFilePath(foreign_table_);
285  file_selector.base_dir = base_path;
286  file_selector.recursive = true;
287 
288  auto file_info_result = file_system_->GetFileInfo(file_selector);
289  if (!file_info_result.ok()) {
290  // This is expected when `base_path` points to a single file.
291  file_paths.emplace(base_path);
292  } else {
293  auto& file_info_vector = file_info_result.ValueOrDie();
294  for (const auto& file_info : file_info_vector) {
295  if (file_info.type() == arrow::fs::FileType::File) {
296  file_paths.emplace(file_info.path());
297  }
298  }
299  if (file_paths.empty()) {
300  throw std::runtime_error{"No file found at given path \"" + base_path + "\"."};
301  }
302  }
303  return file_paths;
304 }
std::shared_ptr< arrow::fs::FileSystem > file_system_
#define DEBUG_TIMER(name)
Definition: Logger.h:313
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 54 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTER_FRAGMENT.

std::list< const ColumnDescriptor * > foreign_storage::ParquetDataWrapper::getColumnsToInitialize ( const Interval< ColumnType > &  column_interval)
private

Definition at line 104 of file ParquetDataWrapper.cpp.

References CHECK, db_id_, foreign_storage::Interval< T >::end, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), schema_, and foreign_storage::Interval< T >::start.

Referenced by initializeChunkBuffers().

105  {
107  CHECK(catalog);
108  const auto& columns = schema_->getLogicalAndPhysicalColumns();
109  auto column_start = column_interval.start;
110  auto column_end = column_interval.end;
111  std::list<const ColumnDescriptor*> columns_to_init;
112  for (const auto column : columns) {
113  auto column_id = column->columnId;
114  if (column_id >= column_start && column_id <= column_end) {
115  columns_to_init.push_back(column);
116  }
117  }
118  return columns_to_init;
119 }
std::unique_ptr< ForeignTableSchema > schema_
static SysCatalog & instance()
Definition: SysCatalog.h:292
T const end
Definition: Intervals.h:68
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 56 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::set< std::string > foreign_storage::ParquetDataWrapper::getProcessedFilePaths ( )
private

Definition at line 270 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_.

Referenced by fetchChunkMetadata().

270  {
271  std::set<std::string> file_paths;
272  for (const auto& entry : fragment_to_row_group_interval_map_) {
273  for (const auto& row_group_interval : entry.second) {
274  file_paths.emplace(row_group_interval.file_path);
275  }
276  }
277  return file_paths;
278 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::initializeChunkBuffers ( const int  fragment_index,
const Interval< ColumnType > &  column_interval,
const ChunkToBufferMap required_buffers,
const bool  reserve_buffers_and_set_stats = false 
)
private

Definition at line 121 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, getColumnsToInitialize(), kENCODING_NONE, and TableDescriptor::tableId.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

125  {
126  for (const auto column : getColumnsToInitialize(column_interval)) {
127  Chunk_NS::Chunk chunk{column};
128  ChunkKey data_chunk_key;
129  if (column->columnType.is_varlen_indeed()) {
130  data_chunk_key = {
131  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
132  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
133  auto data_buffer = required_buffers.at(data_chunk_key);
134  chunk.setBuffer(data_buffer);
135 
136  ChunkKey index_chunk_key{
137  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
138  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
139  auto index_buffer = required_buffers.at(index_chunk_key);
140  chunk.setIndexBuffer(index_buffer);
141  } else {
142  data_chunk_key = {
143  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
144  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
145  auto data_buffer = required_buffers.at(data_chunk_key);
146  chunk.setBuffer(data_buffer);
147  }
148  chunk.initEncoder();
149  if (reserve_buffers_and_set_stats) {
150  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
151  CHECK(metadata_it != chunk_metadata_map_.end());
152  auto buffer = chunk.getBuffer();
153  auto& metadata = metadata_it->second;
154  auto encoder = buffer->getEncoder();
155  encoder->resetChunkStats(metadata->chunkStats);
156  encoder->setNumElems(metadata->numElements);
157  if (column->columnType.is_string() &&
158  column->columnType.get_compression() == kENCODING_NONE) {
159  auto index_buffer = chunk.getIndexBuf();
160  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
161  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
162  auto index_buffer = chunk.getIndexBuf();
163  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
164  } else {
165  size_t num_bytes_to_reserve =
166  metadata->numElements * column->columnType.get_size();
167  buffer->reserve(num_bytes_to_reserve);
168  }
169  }
170  }
171 }
std::vector< int > ChunkKey
Definition: types.h:37
int32_t StringOffsetT
Definition: sqltypes.h:936
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
int32_t ArrayOffsetT
Definition: sqltypes.h:937
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isNewFile ( const std::string &  file_path) const
private

Definition at line 190 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, and last_fragment_index_.

Referenced by metadataScanFiles().

190  {
191  const auto last_fragment_entry =
193  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
194 
195  // The entry for the first fragment starts out as an empty vector
196  if (last_fragment_entry->second.empty()) {
198  return true;
199  } else {
200  return (last_fragment_entry->second.back().file_path != file_path);
201  }
202 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 553 of file ParquetDataWrapper.cpp.

References is_restored_.

553  {
554  return is_restored_;
555 }
void foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader ( const int  logical_column_id,
const int  fragment_id,
const ChunkToBufferMap required_buffers 
)
private

Definition at line 367 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, ColumnDescriptor::columnType, db_id_, foreign_storage::Interval< T >::end, file_reader_cache_, file_system_, foreign_table_, fragment_to_row_group_interval_map_, TableDescriptor::fragmenter, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_elem_type(), SQLTypeInfo::get_physical_cols(), Catalog_Namespace::SysCatalog::getCatalog(), initializeChunkBuffers(), Catalog_Namespace::SysCatalog::instance(), SQLTypeInfo::is_array(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_geometry(), foreign_storage::LazyParquetChunkLoader::loadChunk(), schema_, foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

370  {
372  CHECK(catalog);
373  const ColumnDescriptor* logical_column =
374  schema_->getColumnDescriptor(logical_column_id);
375  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
376 
377  const Interval<ColumnType> column_interval = {
378  logical_column_id,
379  logical_column_id + logical_column->columnType.get_physical_cols()};
380  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
381 
382  const auto& row_group_intervals = fragment_to_row_group_interval_map_.at(fragment_id);
383 
384  const bool is_dictionary_encoded_string_column =
385  logical_column->columnType.is_dict_encoded_string() ||
386  (logical_column->columnType.is_array() &&
387  logical_column->columnType.get_elem_type().is_dict_encoded_string());
388 
389  StringDictionary* string_dictionary = nullptr;
390  if (is_dictionary_encoded_string_column) {
391  auto dict_descriptor = catalog->getMetadataForDictUnlocked(
392  logical_column->columnType.get_comp_param(), true);
393  CHECK(dict_descriptor);
394  string_dictionary = dict_descriptor->stringDict.get();
395  }
396 
397  std::list<Chunk_NS::Chunk> chunks;
398  for (int column_id = column_interval.start; column_id <= column_interval.end;
399  ++column_id) {
400  auto column_descriptor = schema_->getColumnDescriptor(column_id);
401  Chunk_NS::Chunk chunk{column_descriptor};
402  if (column_descriptor->columnType.is_varlen_indeed()) {
403  ChunkKey data_chunk_key = {
404  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
405  auto buffer = required_buffers.at(data_chunk_key);
406  chunk.setBuffer(buffer);
407  ChunkKey index_chunk_key = {
408  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
409  auto index_buffer = required_buffers.at(index_chunk_key);
410  chunk.setIndexBuffer(index_buffer);
411  } else {
412  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
413  auto buffer = required_buffers.at(chunk_key);
414  chunk.setBuffer(buffer);
415  }
416  chunks.emplace_back(chunk);
417  }
418 
419  LazyParquetChunkLoader chunk_loader(file_system_, file_reader_cache_.get());
420  auto metadata = chunk_loader.loadChunk(
421  row_group_intervals, parquet_column_index, chunks, string_dictionary);
422  auto fragmenter = foreign_table_->fragmenter;
423 
424  auto metadata_iter = metadata.begin();
425  for (int column_id = column_interval.start; column_id <= column_interval.end;
426  ++column_id, ++metadata_iter) {
427  auto column = schema_->getColumnDescriptor(column_id);
428  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
429  if (column->columnType.is_varlen_indeed()) {
430  data_chunk_key.emplace_back(1);
431  }
432  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
433 
434  // Allocate new shared_ptr for metadata so we dont modify old one which may be used by
435  // executor
436  auto cached_metadata_previous = chunk_metadata_map_.at(data_chunk_key);
437  chunk_metadata_map_.at(data_chunk_key) = std::make_shared<ChunkMetadata>();
438  auto cached_metadata = chunk_metadata_map_.at(data_chunk_key);
439  *cached_metadata = *cached_metadata_previous;
440 
441  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
442  cached_metadata->numBytes = required_buffers.at(data_chunk_key)->size();
443 
444  // for certain types, update the metadata statistics
445  // should update the fragmenter, cache, and the internal chunk_metadata_map_
446  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
447  CHECK(metadata_iter != metadata.end());
448  auto& chunk_metadata_ptr = *metadata_iter;
449  cached_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
450  cached_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
451 
452  // Update stats on buffer so it is saved in cache
453  required_buffers.at(data_chunk_key)
454  ->getEncoder()
455  ->resetChunkStats(cached_metadata->chunkStats);
456  }
457 
458  if (fragmenter) {
459  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
460  }
461  }
462 }
std::vector< int > ChunkKey
Definition: types.h:37
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
int get_physical_cols() const
Definition: sqltypes.h:335
static SysCatalog & instance()
Definition: SysCatalog.h:292
T const end
Definition: Intervals.h:68
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
specifies the content in-memory of a row in the column metadata table
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:323
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:500
bool is_dict_encoded_string() const
Definition: sqltypes.h:525
SQLTypeInfo columnType
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:712
bool is_array() const
Definition: sqltypes.h:496

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanFiles ( const std::set< std::string > &  file_paths)
private

Definition at line 306 of file ParquetDataWrapper.cpp.

References addNewFile(), addNewFragment(), CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, file_reader_cache_, file_system_, finalizeFragmentMap(), foreign_table_, isNewFile(), last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::LazyParquetChunkLoader::metadataScan(), moveToNextFragment(), foreign_storage::anonymous_namespace{ParquetDataWrapper.cpp}::reduce_metadata(), schema_, TableDescriptor::tableId, and total_row_count_.

Referenced by fetchChunkMetadata().

306  {
307  LazyParquetChunkLoader chunk_loader(file_system_, file_reader_cache_.get());
308  auto row_group_metadata = chunk_loader.metadataScan(file_paths, *schema_);
309  auto column_interval =
310  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
311  schema_->getLogicalAndPhysicalColumns().back()->columnId};
312 
313  for (const auto& row_group_metadata_item : row_group_metadata) {
314  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
315  CHECK(static_cast<int>(column_chunk_metadata.size()) ==
316  schema_->numLogicalAndPhysicalColumns());
317  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
318  const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
319  int row_group = row_group_metadata_item.row_group_index;
320  const auto& file_path = row_group_metadata_item.file_path;
321  if (moveToNextFragment(import_row_count)) {
322  addNewFragment(row_group, file_path);
323  } else if (isNewFile(file_path)) {
324  CHECK_EQ(row_group, 0);
325  addNewFile(file_path);
326  }
327  last_row_group_ = row_group;
328 
329  for (int column_id = column_interval.start; column_id <= column_interval.end;
330  column_id++, column_chunk_metadata_iter++) {
331  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
332  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
333 
334  const auto& type_info = column_descriptor->columnType;
335  ChunkKey chunk_key{
337  ChunkKey data_chunk_key = chunk_key;
338  if (type_info.is_varlen_indeed()) {
339  data_chunk_key.emplace_back(1);
340  }
341  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
342  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
343  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
344  } else {
345  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
346  }
347  }
348  last_fragment_row_count_ += import_row_count;
349  total_row_count_ += import_row_count;
350  }
352 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
void addNewFragment(int row_group, const std::string &file_path)
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::shared_ptr< arrow::fs::FileSystem > file_system_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::moveToNextFragment ( size_t  new_rows_count) const
private

Definition at line 354 of file ParquetDataWrapper.cpp.

References foreign_table_, last_fragment_row_count_, and TableDescriptor::maxFragRows.

Referenced by metadataScanFiles().

354  {
355  return (last_fragment_row_count_ + new_rows_count) >
356  static_cast<size_t>(foreign_table_->maxFragRows);
357 }

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 464 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, g_max_import_threads, loadBuffersUsingLazyParquetChunkLoader(), foreign_storage::partition_for_threads(), and schema_.

465  {
466  ChunkToBufferMap buffers_to_load;
467  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
468  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
469 
470  CHECK(!buffers_to_load.empty());
471 
472  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
473  for (const auto& [chunk_key, buffer] : buffers_to_load) {
474  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
475  col_frag_hints.emplace(
476  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
477  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
478  }
479 
480  auto hints_per_thread = partition_for_threads(col_frag_hints, g_max_import_threads);
481 
482  std::vector<std::future<void>> futures;
483  for (const auto& hint_set : hints_per_thread) {
484  futures.emplace_back(std::async(std::launch::async, [&, hint_set, this] {
485  for (const auto& [col_id, frag_id] : hint_set) {
486  loadBuffersUsingLazyParquetChunkLoader(col_id, frag_id, buffers_to_load);
487  }
488  }));
489  }
490 
491  for (auto& future : futures) {
492  future.get();
493  }
494 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: ParquetShared.h:41
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers)
#define CHECK(condition)
Definition: Logger.h:197
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
size_t g_max_import_threads
Definition: Importer.cpp:84

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 359 of file ParquetDataWrapper.cpp.

References chunk_metadata_map_, and fetchChunkMetadata().

360  {
362  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
363  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
364  }
365 }
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::resetParquetMetadata ( )
private

Definition at line 93 of file ParquetDataWrapper.cpp.

References file_reader_cache_, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and total_row_count_.

Referenced by fetchChunkMetadata().

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 532 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, test_fsi::d, fragment_to_row_group_interval_map_, foreign_storage::json_utils::get_value_from_object(), is_restored_, last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::json_utils::read_from_file(), and total_row_count_.

534  {
535  auto d = json_utils::read_from_file(file_path);
536  CHECK(d.IsObject());
537 
539  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
543  d, last_fragment_row_count_, "last_fragment_row_count");
545 
546  CHECK(chunk_metadata_map_.empty());
547  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
548  chunk_metadata_map_[chunk_key] = chunk_metadata;
549  }
550  is_restored_ = true;
551 }
tuple d
Definition: test_fsi.py:9
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
rapidjson::Document read_from_file(const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::serializeDataWrapperInternals ( const std::string &  file_path) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Parameters
file_path- location to save file to

Implements foreign_storage::ForeignDataWrapper.

Definition at line 512 of file ParquetDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), test_fsi::d, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, total_row_count_, and foreign_storage::json_utils::write_to_file().

513  {
514  rapidjson::Document d;
515  d.SetObject();
516 
519  "fragment_to_row_group_interval_map",
520  d.GetAllocator());
521  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
523  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
525  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
527  d, total_row_count_, "total_row_count", d.GetAllocator());
528 
529  json_utils::write_to_file(d, file_path);
530 }
tuple d
Definition: test_fsi.py:9
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)

+ Here is the call graph for this function:

Member Data Documentation

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::ParquetDataWrapper::chunk_metadata_map_
private
const int foreign_storage::ParquetDataWrapper::db_id_
private
std::unique_ptr<FileReaderMap> foreign_storage::ParquetDataWrapper::file_reader_cache_
private
std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetDataWrapper::file_system_
private
const ForeignTable* foreign_storage::ParquetDataWrapper::foreign_table_
private
std::map<int, std::vector<RowGroupInterval> > foreign_storage::ParquetDataWrapper::fragment_to_row_group_interval_map_
private
bool foreign_storage::ParquetDataWrapper::is_restored_
private

Definition at line 96 of file ParquetDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

int foreign_storage::ParquetDataWrapper::last_fragment_index_
private
size_t foreign_storage::ParquetDataWrapper::last_fragment_row_count_
private
int foreign_storage::ParquetDataWrapper::last_row_group_
private
std::unique_ptr<ForeignTableSchema> foreign_storage::ParquetDataWrapper::schema_
private
size_t foreign_storage::ParquetDataWrapper::total_row_count_
private

The documentation for this class was generated from the following files: