OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::ParquetDataWrapper Class Reference

#include <ParquetDataWrapper.h>

+ Inheritance diagram for foreign_storage::ParquetDataWrapper:
+ Collaboration diagram for foreign_storage::ParquetDataWrapper:

Public Member Functions

 ParquetDataWrapper ()
 
 ParquetDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Private Member Functions

std::list< const
ColumnDescriptor * > 
getColumnsToInitialize (const Interval< ColumnType > &column_interval)
 
void initializeChunkBuffers (const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
 
void fetchChunkMetadata ()
 
void loadBuffersUsingLazyParquetChunkLoader (const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers)
 
std::set< std::string > getProcessedFilePaths ()
 
std::vector< std::string > getAllFilePaths ()
 
bool moveToNextFragment (size_t new_rows_count) const
 
void finalizeFragmentMap ()
 
void addNewFragment (int row_group, const std::string &file_path)
 
bool isNewFile (const std::string &file_path) const
 
void addNewFile (const std::string &file_path)
 
void resetParquetMetadata ()
 
void metadataScanFiles (const std::vector< std::string > &file_paths)
 

Private Attributes

std::map< int, std::vector
< RowGroupInterval > > 
fragment_to_row_group_interval_map_
 
std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
const int db_id_
 
const ForeignTableforeign_table_
 
int last_fragment_index_
 
size_t last_fragment_row_count_
 
size_t total_row_count_
 
int last_row_group_
 
bool is_restored_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
std::unique_ptr< FileReaderMapfile_reader_cache_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 

Detailed Description

Definition at line 35 of file ParquetDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( )

Definition at line 72 of file ParquetDataWrapper.cpp.

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)

Definition at line 74 of file ParquetDataWrapper.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

75  : db_id_(db_id)
76  , foreign_table_(foreign_table)
79  , total_row_count_(0)
80  , last_row_group_(0)
81  , is_restored_(false)
82  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
83  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
84  auto& server_options = foreign_table->foreign_server->options;
85  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
86  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
87  } else {
88  UNREACHABLE();
89  }
90 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:253
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

void foreign_storage::ParquetDataWrapper::addNewFile ( const std::string &  file_path)
private

Definition at line 203 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanFiles().

203  {
204  const auto last_fragment_entry =
206  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
207 
208  // The entry for the first fragment starts out as an empty vector
209  if (last_fragment_entry->second.empty()) {
211  } else {
212  last_fragment_entry->second.back().end_index = last_row_group_;
213  }
214  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
215 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::addNewFragment ( int  row_group,
const std::string &  file_path 
)
private

Definition at line 177 of file ParquetDataWrapper.cpp.

References CHECK, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, and last_row_group_.

Referenced by metadataScanFiles().

177  {
178  const auto last_fragment_entry =
180  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
181 
182  last_fragment_entry->second.back().end_index = last_row_group_;
186  RowGroupInterval{file_path, row_group});
187 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::fetchChunkMetadata ( )
private

Definition at line 217 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, chunk_metadata_map_, shared::contains(), db_id_, file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), Catalog_Namespace::SysCatalog::getCatalog(), getProcessedFilePaths(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), metadataScanFiles(), resetParquetMetadata(), foreign_storage::throw_removed_file_error(), foreign_storage::throw_removed_row_error(), and total_row_count_.

Referenced by populateChunkMetadata().

217  {
219  CHECK(catalog);
220  std::vector<std::string> new_file_paths;
221  auto processed_file_paths = getProcessedFilePaths();
222  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
223  auto all_file_paths = getAllFilePaths();
224  for (const auto& file_path : processed_file_paths) {
225  if (!shared::contains(all_file_paths, file_path)) {
226  throw_removed_file_error(file_path);
227  }
228  }
229 
230  for (const auto& file_path : all_file_paths) {
231  if (!shared::contains(processed_file_paths, file_path)) {
232  new_file_paths.emplace_back(file_path);
233  }
234  }
235 
236  // Single file append
237  // If an append occurs with multiple files, then we assume any existing files have
238  // not been altered. If an append occurs on a single file, then we check to see if
239  // it has changed.
240  if (new_file_paths.empty() && all_file_paths.size() == 1) {
241  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
242  const auto& file_path = *all_file_paths.begin();
243  CHECK_EQ(*processed_file_paths.begin(), file_path);
244 
245  // Since an existing file is being appended to we need to update the cached
246  // FileReader as the existing one will be out of date.
247  auto reader = file_reader_cache_->insert(file_path, file_system_);
248  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
249 
250  if (row_count < total_row_count_) {
251  throw_removed_row_error(file_path);
252  } else if (row_count > total_row_count_) {
253  new_file_paths = all_file_paths;
254  chunk_metadata_map_.clear();
256  }
257  }
258  } else {
259  CHECK(chunk_metadata_map_.empty());
260  new_file_paths = getAllFilePaths();
262  }
263 
264  if (!new_file_paths.empty()) {
265  metadataScanFiles(new_file_paths);
266  }
267 }
bool contains(const T &container, const U &element)
Definition: misc.h:188
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
std::vector< std::string > getAllFilePaths()
void throw_removed_row_error(const std::string &file_path)
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static SysCatalog & instance()
Definition: SysCatalog.h:325
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void metadataScanFiles(const std::vector< std::string > &file_paths)
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::finalizeFragmentMap ( )
private

Definition at line 172 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanFiles().

172  {
175 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::vector< std::string > foreign_storage::ParquetDataWrapper::getAllFilePaths ( )
private

Definition at line 279 of file ParquetDataWrapper.cpp.

References DEBUG_TIMER, foreign_storage::AbstractFileStorageDataWrapper::FILE_SORT_ORDER_BY_KEY, foreign_storage::AbstractFileStorageDataWrapper::FILE_SORT_REGEX_KEY, foreign_storage::ForeignTable::foreign_server, foreign_table_, foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::OptionsContainer::getOption(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, shared::local_glob_filter_sort_files(), foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

Referenced by fetchChunkMetadata().

279  {
280  auto timer = DEBUG_TIMER(__func__);
281  std::vector<std::string> found_file_paths;
282  auto file_path = getFullFilePath(foreign_table_);
283  const auto& regex_pattern = foreign_table_->getOption(REGEX_PATH_FILTER_KEY);
284  const auto& sort_by = foreign_table_->getOption(FILE_SORT_ORDER_BY_KEY);
285  const auto& sort_regex = foreign_table_->getOption(FILE_SORT_REGEX_KEY);
286 
287  auto& server_options = foreign_table_->foreign_server->options;
288  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
289  found_file_paths = shared::local_glob_filter_sort_files(
290  file_path, regex_pattern, sort_by, sort_regex);
291  } else {
292  UNREACHABLE();
293  }
294  return found_file_paths;
295 }
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex)
#define UNREACHABLE()
Definition: Logger.h:253
std::optional< std::string > getOption(const std::string_view &key) const
const ForeignServer * foreign_server
Definition: ForeignTable.h:54
#define DEBUG_TIMER(name)
Definition: Logger.h:352
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 54 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTER_FRAGMENT.

std::list< const ColumnDescriptor * > foreign_storage::ParquetDataWrapper::getColumnsToInitialize ( const Interval< ColumnType > &  column_interval)
private

Definition at line 103 of file ParquetDataWrapper.cpp.

References CHECK, db_id_, foreign_storage::Interval< T >::end, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), schema_, and foreign_storage::Interval< T >::start.

Referenced by initializeChunkBuffers().

104  {
106  CHECK(catalog);
107  const auto& columns = schema_->getLogicalAndPhysicalColumns();
108  auto column_start = column_interval.start;
109  auto column_end = column_interval.end;
110  std::list<const ColumnDescriptor*> columns_to_init;
111  for (const auto column : columns) {
112  auto column_id = column->columnId;
113  if (column_id >= column_start && column_id <= column_end) {
114  columns_to_init.push_back(column);
115  }
116  }
117  return columns_to_init;
118 }
std::unique_ptr< ForeignTableSchema > schema_
static SysCatalog & instance()
Definition: SysCatalog.h:325
T const end
Definition: Intervals.h:68
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 56 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::set< std::string > foreign_storage::ParquetDataWrapper::getProcessedFilePaths ( )
private

Definition at line 269 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_.

Referenced by fetchChunkMetadata().

269  {
270  std::set<std::string> file_paths;
271  for (const auto& entry : fragment_to_row_group_interval_map_) {
272  for (const auto& row_group_interval : entry.second) {
273  file_paths.emplace(row_group_interval.file_path);
274  }
275  }
276  return file_paths;
277 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::string foreign_storage::ParquetDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 507 of file ParquetDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, total_row_count_, and foreign_storage::json_utils::write_to_string().

507  {
508  rapidjson::Document d;
509  d.SetObject();
510 
513  "fragment_to_row_group_interval_map",
514  d.GetAllocator());
515  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
517  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
519  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
521  d, total_row_count_, "total_row_count", d.GetAllocator());
522  return json_utils::write_to_string(d);
523 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
std::string write_to_string(const rapidjson::Document &document)

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::initializeChunkBuffers ( const int  fragment_index,
const Interval< ColumnType > &  column_interval,
const ChunkToBufferMap required_buffers,
const bool  reserve_buffers_and_set_stats = false 
)
private

Definition at line 120 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, shared::get_from_map(), getColumnsToInitialize(), kENCODING_NONE, and TableDescriptor::tableId.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

124  {
125  for (const auto column : getColumnsToInitialize(column_interval)) {
126  Chunk_NS::Chunk chunk{column};
127  ChunkKey data_chunk_key;
128  if (column->columnType.is_varlen_indeed()) {
129  data_chunk_key = {
130  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
131  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
132  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
133  chunk.setBuffer(data_buffer);
134 
135  ChunkKey index_chunk_key{
136  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
137  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
138  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
139  chunk.setIndexBuffer(index_buffer);
140  } else {
141  data_chunk_key = {
142  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
143  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
144  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
145  chunk.setBuffer(data_buffer);
146  }
147  chunk.initEncoder();
148  if (reserve_buffers_and_set_stats) {
149  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
150  CHECK(metadata_it != chunk_metadata_map_.end());
151  auto buffer = chunk.getBuffer();
152  auto& metadata = metadata_it->second;
153  auto encoder = buffer->getEncoder();
154  encoder->resetChunkStats(metadata->chunkStats);
155  encoder->setNumElems(metadata->numElements);
156  if (column->columnType.is_string() &&
157  column->columnType.get_compression() == kENCODING_NONE) {
158  auto index_buffer = chunk.getIndexBuf();
159  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
160  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
161  auto index_buffer = chunk.getIndexBuf();
162  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
163  } else {
164  size_t num_bytes_to_reserve =
165  metadata->numElements * column->columnType.get_size();
166  buffer->reserve(num_bytes_to_reserve);
167  }
168  }
169  }
170 }
std::vector< int > ChunkKey
Definition: types.h:37
int32_t StringOffsetT
Definition: sqltypes.h:1075
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
int32_t ArrayOffsetT
Definition: sqltypes.h:1076
V & get_from_map(std::map< K, V > &map, const K &key)
Definition: misc.h:58
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isNewFile ( const std::string &  file_path) const
private

Definition at line 189 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, and last_fragment_index_.

Referenced by metadataScanFiles().

189  {
190  const auto last_fragment_entry =
192  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
193 
194  // The entry for the first fragment starts out as an empty vector
195  if (last_fragment_entry->second.empty()) {
197  return true;
198  } else {
199  return (last_fragment_entry->second.back().file_path != file_path);
200  }
201 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 546 of file ParquetDataWrapper.cpp.

References is_restored_.

546  {
547  return is_restored_;
548 }
void foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader ( const int  logical_column_id,
const int  fragment_id,
const ChunkToBufferMap required_buffers 
)
private

Definition at line 358 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, ColumnDescriptor::columnType, db_id_, foreign_storage::Interval< T >::end, file_reader_cache_, file_system_, foreign_table_, fragment_to_row_group_interval_map_, TableDescriptor::fragmenter, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_elem_type(), shared::get_from_map(), SQLTypeInfo::get_physical_cols(), Catalog_Namespace::SysCatalog::getCatalog(), initializeChunkBuffers(), Catalog_Namespace::SysCatalog::instance(), SQLTypeInfo::is_array(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_geometry(), foreign_storage::LazyParquetChunkLoader::loadChunk(), schema_, foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

361  {
363  CHECK(catalog);
364  const ColumnDescriptor* logical_column =
365  schema_->getColumnDescriptor(logical_column_id);
366  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
367 
368  const Interval<ColumnType> column_interval = {
369  logical_column_id,
370  logical_column_id + logical_column->columnType.get_physical_cols()};
371  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
372 
373  const auto& row_group_intervals =
375 
376  const bool is_dictionary_encoded_string_column =
377  logical_column->columnType.is_dict_encoded_string() ||
378  (logical_column->columnType.is_array() &&
379  logical_column->columnType.get_elem_type().is_dict_encoded_string());
380 
381  StringDictionary* string_dictionary = nullptr;
382  if (is_dictionary_encoded_string_column) {
383  auto dict_descriptor =
384  catalog->getMetadataForDict(logical_column->columnType.get_comp_param(), true);
385  CHECK(dict_descriptor);
386  string_dictionary = dict_descriptor->stringDict.get();
387  }
388 
389  std::list<Chunk_NS::Chunk> chunks;
390  for (int column_id = column_interval.start; column_id <= column_interval.end;
391  ++column_id) {
392  auto column_descriptor = schema_->getColumnDescriptor(column_id);
393  Chunk_NS::Chunk chunk{column_descriptor};
394  if (column_descriptor->columnType.is_varlen_indeed()) {
395  ChunkKey data_chunk_key = {
396  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
397  auto buffer = shared::get_from_map(required_buffers, data_chunk_key);
398  chunk.setBuffer(buffer);
399  ChunkKey index_chunk_key = {
400  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
401  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
402  chunk.setIndexBuffer(index_buffer);
403  } else {
404  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
405  auto buffer = shared::get_from_map(required_buffers, chunk_key);
406  chunk.setBuffer(buffer);
407  }
408  chunks.emplace_back(chunk);
409  }
410 
411  LazyParquetChunkLoader chunk_loader(file_system_, file_reader_cache_.get());
412  auto metadata = chunk_loader.loadChunk(
413  row_group_intervals, parquet_column_index, chunks, string_dictionary);
414  auto fragmenter = foreign_table_->fragmenter;
415 
416  auto metadata_iter = metadata.begin();
417  for (int column_id = column_interval.start; column_id <= column_interval.end;
418  ++column_id, ++metadata_iter) {
419  auto column = schema_->getColumnDescriptor(column_id);
420  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
421  if (column->columnType.is_varlen_indeed()) {
422  data_chunk_key.emplace_back(1);
423  }
424  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
425 
426  // Allocate new shared_ptr for metadata so we dont modify old one which may be used
427  // by executor
428  auto cached_metadata_previous =
429  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
430  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
431  std::make_shared<ChunkMetadata>();
432  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
433  *cached_metadata = *cached_metadata_previous;
434 
435  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
436  cached_metadata->numBytes =
437  shared::get_from_map(required_buffers, data_chunk_key)->size();
438 
439  // for certain types, update the metadata statistics
440  // should update the fragmenter, cache, and the internal chunk_metadata_map_
441  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
442  CHECK(metadata_iter != metadata.end());
443  auto& chunk_metadata_ptr = *metadata_iter;
444  cached_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
445  cached_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
446 
447  // Update stats on buffer so it is saved in cache
448  shared::get_from_map(required_buffers, data_chunk_key)
449  ->getEncoder()
450  ->resetChunkStats(cached_metadata->chunkStats);
451  }
452 
453  if (fragmenter) {
454  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
455  }
456  }
457 }
std::vector< int > ChunkKey
Definition: types.h:37
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
int get_physical_cols() const
Definition: sqltypes.h:350
static SysCatalog & instance()
Definition: SysCatalog.h:325
T const end
Definition: Intervals.h:68
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
specifies the content in-memory of a row in the column metadata table
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V > &map, const K &key)
Definition: misc.h:58
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:209
bool is_geometry() const
Definition: sqltypes.h:521
bool is_dict_encoded_string() const
Definition: sqltypes.h:546
SQLTypeInfo columnType
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:850
bool is_array() const
Definition: sqltypes.h:517

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanFiles ( const std::vector< std::string > &  file_paths)
private

Definition at line 297 of file ParquetDataWrapper.cpp.

References addNewFile(), addNewFragment(), CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, file_reader_cache_, file_system_, finalizeFragmentMap(), foreign_table_, isNewFile(), last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::LazyParquetChunkLoader::metadataScan(), moveToNextFragment(), foreign_storage::anonymous_namespace{ParquetDataWrapper.cpp}::reduce_metadata(), schema_, TableDescriptor::tableId, and total_row_count_.

Referenced by fetchChunkMetadata().

297  {
298  LazyParquetChunkLoader chunk_loader(file_system_, file_reader_cache_.get());
299  auto row_group_metadata = chunk_loader.metadataScan(file_paths, *schema_);
300  auto column_interval =
301  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
302  schema_->getLogicalAndPhysicalColumns().back()->columnId};
303 
304  for (const auto& row_group_metadata_item : row_group_metadata) {
305  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
306  CHECK(static_cast<int>(column_chunk_metadata.size()) ==
307  schema_->numLogicalAndPhysicalColumns());
308  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
309  const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
310  int row_group = row_group_metadata_item.row_group_index;
311  const auto& file_path = row_group_metadata_item.file_path;
312  if (moveToNextFragment(import_row_count)) {
313  addNewFragment(row_group, file_path);
314  } else if (isNewFile(file_path)) {
315  CHECK_EQ(row_group, 0);
316  addNewFile(file_path);
317  }
318  last_row_group_ = row_group;
319 
320  for (int column_id = column_interval.start; column_id <= column_interval.end;
321  column_id++, column_chunk_metadata_iter++) {
322  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
323  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
324 
325  const auto& type_info = column_descriptor->columnType;
326  ChunkKey chunk_key{
328  ChunkKey data_chunk_key = chunk_key;
329  if (type_info.is_varlen_indeed()) {
330  data_chunk_key.emplace_back(1);
331  }
332  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
333  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
334  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
335  } else {
336  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
337  }
338  }
339  last_fragment_row_count_ += import_row_count;
340  total_row_count_ += import_row_count;
341  }
343 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int > ChunkKey
Definition: types.h:37
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
void addNewFragment(int row_group, const std::string &file_path)
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::shared_ptr< arrow::fs::FileSystem > file_system_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::moveToNextFragment ( size_t  new_rows_count) const
private

Definition at line 345 of file ParquetDataWrapper.cpp.

References foreign_table_, last_fragment_row_count_, and TableDescriptor::maxFragRows.

Referenced by metadataScanFiles().

345  {
346  return (last_fragment_row_count_ + new_rows_count) >
347  static_cast<size_t>(foreign_table_->maxFragRows);
348 }

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 459 of file ParquetDataWrapper.cpp.

References threading_serial::async(), CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, g_max_import_threads, loadBuffersUsingLazyParquetChunkLoader(), foreign_storage::partition_for_threads(), and schema_.

460  {
461  ChunkToBufferMap buffers_to_load;
462  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
463  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
464 
465  CHECK(!buffers_to_load.empty());
466 
467  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
468  for (const auto& [chunk_key, buffer] : buffers_to_load) {
469  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
470  col_frag_hints.emplace(
471  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
472  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
473  }
474 
475  auto hints_per_thread = partition_for_threads(col_frag_hints, g_max_import_threads);
476 
477  std::vector<std::future<void>> futures;
478  for (const auto& hint_set : hints_per_thread) {
479  futures.emplace_back(std::async(std::launch::async, [&, hint_set, this] {
480  for (const auto& [col_id, frag_id] : hint_set) {
481  loadBuffersUsingLazyParquetChunkLoader(col_id, frag_id, buffers_to_load);
482  }
483  }));
484  }
485 
486  for (auto& future : futures) {
487  future.get();
488  }
489 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: ParquetShared.h:41
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
future< Result > async(Fn &&fn, Args &&...args)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers)
#define CHECK(condition)
Definition: Logger.h:209
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
size_t g_max_import_threads
Definition: Importer.cpp:85

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 350 of file ParquetDataWrapper.cpp.

References chunk_metadata_map_, and fetchChunkMetadata().

351  {
353  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
354  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
355  }
356 }
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::resetParquetMetadata ( )
private

Definition at line 92 of file ParquetDataWrapper.cpp.

References file_reader_cache_, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and total_row_count_.

Referenced by fetchChunkMetadata().

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 525 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, fragment_to_row_group_interval_map_, foreign_storage::json_utils::get_value_from_object(), is_restored_, last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::json_utils::read_from_file(), and total_row_count_.

527  {
528  auto d = json_utils::read_from_file(file_path);
529  CHECK(d.IsObject());
530 
532  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
534  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
536  d, last_fragment_row_count_, "last_fragment_row_count");
538 
539  CHECK(chunk_metadata_map_.empty());
540  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
541  chunk_metadata_map_[chunk_key] = chunk_metadata;
542  }
543  is_restored_ = true;
544 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
rapidjson::Document read_from_file(const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

Member Data Documentation

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::ParquetDataWrapper::chunk_metadata_map_
private
const int foreign_storage::ParquetDataWrapper::db_id_
private
std::unique_ptr<FileReaderMap> foreign_storage::ParquetDataWrapper::file_reader_cache_
private
std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetDataWrapper::file_system_
private
const ForeignTable* foreign_storage::ParquetDataWrapper::foreign_table_
private
std::map<int, std::vector<RowGroupInterval> > foreign_storage::ParquetDataWrapper::fragment_to_row_group_interval_map_
private
bool foreign_storage::ParquetDataWrapper::is_restored_
private

Definition at line 96 of file ParquetDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

int foreign_storage::ParquetDataWrapper::last_fragment_index_
private
size_t foreign_storage::ParquetDataWrapper::last_fragment_row_count_
private
int foreign_storage::ParquetDataWrapper::last_row_group_
private
std::unique_ptr<ForeignTableSchema> foreign_storage::ParquetDataWrapper::schema_
private
size_t foreign_storage::ParquetDataWrapper::total_row_count_
private

The documentation for this class was generated from the following files: