OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::ParquetDataWrapper Class Reference

#include <ParquetDataWrapper.h>

+ Inheritance diagram for foreign_storage::ParquetDataWrapper:
+ Collaboration diagram for foreign_storage::ParquetDataWrapper:

Public Member Functions

 ParquetDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (std::map< ChunkKey, AbstractBuffer * > &required_buffers, std::map< ChunkKey, AbstractBuffer * > &optional_buffers) override
 
void serializeDataWrapperInternals (const std::string &file_path) const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 

Static Public Member Functions

static void validateOptions (const ForeignTable *foreign_table)
 
static std::vector
< std::string_view > 
getSupportedOptions ()
 

Private Member Functions

 ParquetDataWrapper (const ForeignTable *foreign_table)
 
std::list< const
ColumnDescriptor * > 
getColumnsToInitialize (const Interval< ColumnType > &column_interval)
 
void initializeChunkBuffers (const int fragment_index, const Interval< ColumnType > &column_interval, std::map< ChunkKey, AbstractBuffer * > &required_buffers, const bool reserve_buffers_and_set_stats=false)
 
void fetchChunkMetadata ()
 
void loadBuffersUsingLazyParquetChunkLoader (const int logical_column_id, const int fragment_id, std::map< ChunkKey, AbstractBuffer * > &required_buffers)
 
void validateFilePath () const
 
std::set< std::string > getProcessedFilePaths ()
 
std::set< std::string > getAllFilePaths ()
 
import_export::CopyParams validateAndGetCopyParams () const
 
std::string validateAndGetStringWithLength (const std::string &option_name, const size_t expected_num_chars) const
 
bool moveToNextFragment (size_t new_rows_count) const
 
void finalizeFragmentMap ()
 
void addNewFragment (int row_group, const std::string &file_path)
 
bool isNewFile (const std::string &file_path) const
 
void addNewFile (const std::string &file_path)
 
void resetParquetMetadata ()
 
void metadataScanFiles (const std::set< std::string > &file_paths)
 

Private Attributes

std::map< int, std::vector
< RowGroupInterval > > 
fragment_to_row_group_interval_map_
 
std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
const int db_id_
 
const ForeignTableforeign_table_
 
int last_fragment_index_
 
size_t last_fragment_row_count_
 
size_t total_row_count_
 
int last_row_group_
 
bool is_restored_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 

Static Private Attributes

static constexpr std::array
< char const *, 0 > 
supported_options_ {}
 

Detailed Description

Definition at line 33 of file ParquetDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)

Definition at line 79 of file ParquetDataWrapper.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::ForeignServer::STORAGE_TYPE_KEY, and UNREACHABLE.

80  : db_id_(db_id)
81  , foreign_table_(foreign_table)
84  , total_row_count_(0)
85  , last_row_group_(0)
86  , is_restored_(false)
87  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table)) {
88  auto& server_options = foreign_table->foreign_server->options;
89  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
91  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
92  } else {
93  UNREACHABLE();
94  }
95 }
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:241
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:45
std::shared_ptr< arrow::fs::FileSystem > file_system_
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:47
foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const ForeignTable foreign_table)
private

Definition at line 97 of file ParquetDataWrapper.cpp.

98  : db_id_(-1), foreign_table_(foreign_table) {}

Member Function Documentation

void foreign_storage::ParquetDataWrapper::addNewFile ( const std::string &  file_path)
private

Definition at line 239 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanFiles().

239  {
240  const auto last_fragment_entry =
242  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
243 
244  // The entry for the first fragment starts out as an empty vector
245  if (last_fragment_entry->second.empty()) {
247  } else {
248  last_fragment_entry->second.back().end_index = last_row_group_;
249  }
250  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
251 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::addNewFragment ( int  row_group,
const std::string &  file_path 
)
private

Definition at line 213 of file ParquetDataWrapper.cpp.

References CHECK, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, and last_row_group_.

Referenced by metadataScanFiles().

213  {
214  const auto last_fragment_entry =
216  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
217 
218  last_fragment_entry->second.back().end_index = last_row_group_;
222  RowGroupInterval{file_path, row_group});
223 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::fetchChunkMetadata ( )
private

Definition at line 253 of file ParquetDataWrapper.cpp.

References CHECK_EQ, Catalog_Namespace::SysCatalog::checkedGetCatalog(), chunk_metadata_map_, db_id_, file_system_, foreign_table_, getAllFilePaths(), getProcessedFilePaths(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), metadataScanFiles(), foreign_storage::open_parquet_table(), resetParquetMetadata(), foreign_storage::throw_removed_file_error(), foreign_storage::throw_removed_row_error(), and total_row_count_.

Referenced by populateChunkMetadata().

253  {
255  std::set<std::string> new_file_paths;
256  auto processed_file_paths = getProcessedFilePaths();
257  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
258  auto all_file_paths = getAllFilePaths();
259  for (const auto& file_path : processed_file_paths) {
260  if (all_file_paths.find(file_path) == all_file_paths.end()) {
261  throw_removed_file_error(file_path);
262  }
263  }
264 
265  for (const auto& file_path : all_file_paths) {
266  if (processed_file_paths.find(file_path) == processed_file_paths.end()) {
267  new_file_paths.emplace(file_path);
268  }
269  }
270 
271  // Single file append
272  if (new_file_paths.empty() && all_file_paths.size() == 1) {
273  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
274  const auto& file_path = *all_file_paths.begin();
275  CHECK_EQ(*processed_file_paths.begin(), file_path);
276 
277  std::unique_ptr<parquet::arrow::FileReader> reader;
278  open_parquet_table(file_path, reader, file_system_);
279  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
280 
281  if (row_count < total_row_count_) {
282  throw_removed_row_error(file_path);
283  } else if (row_count > total_row_count_) {
284  new_file_paths = all_file_paths;
285  chunk_metadata_map_.clear();
287  }
288  }
289  } else {
290  new_file_paths = getAllFilePaths();
291  chunk_metadata_map_.clear();
293  }
294 
295  if (!new_file_paths.empty()) {
296  metadataScanFiles(new_file_paths);
297  }
298 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::set< std::string > getProcessedFilePaths()
std::set< std::string > getAllFilePaths()
void throw_removed_row_error(const std::string &file_path)
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static SysCatalog & instance()
Definition: SysCatalog.h:288
void metadataScanFiles(const std::set< std::string > &file_paths)
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > checkedGetCatalog(const int32_t db_id)
void open_parquet_table(const std::string &file_path, std::unique_ptr< parquet::arrow::FileReader > &reader, std::shared_ptr< arrow::fs::FileSystem > &file_system)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::finalizeFragmentMap ( )
private

Definition at line 208 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanFiles().

208  {
211 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::set< std::string > foreign_storage::ParquetDataWrapper::getAllFilePaths ( )
private

Definition at line 310 of file ParquetDataWrapper.cpp.

References DEBUG_TIMER, file_system_, foreign_table_, and foreign_storage::ForeignTable::getFullFilePath().

Referenced by fetchChunkMetadata().

310  {
311  auto timer = DEBUG_TIMER(__func__);
312  std::set<std::string> file_paths;
313  arrow::fs::FileSelector file_selector{};
314  std::string base_path = foreign_table_->getFullFilePath();
315  file_selector.base_dir = base_path;
316  file_selector.recursive = true;
317 
318  auto file_info_result = file_system_->GetFileInfo(file_selector);
319  if (!file_info_result.ok()) {
320  // This is expected when `base_path` points to a single file.
321  file_paths.emplace(base_path);
322  } else {
323  auto& file_info_vector = file_info_result.ValueOrDie();
324  for (const auto& file_info : file_info_vector) {
325  if (file_info.type() == arrow::fs::FileType::File) {
326  file_paths.emplace(file_info.path());
327  }
328  }
329  if (file_paths.empty()) {
330  throw std::runtime_error{"No file found at given path \"" + base_path + "\"."};
331  }
332  }
333  return file_paths;
334 }
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::string getFullFilePath() const
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
#define DEBUG_TIMER(name)
Definition: Logger.h:313

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::list< const ColumnDescriptor * > foreign_storage::ParquetDataWrapper::getColumnsToInitialize ( const Interval< ColumnType > &  column_interval)
private

Definition at line 139 of file ParquetDataWrapper.cpp.

References Catalog_Namespace::SysCatalog::checkedGetCatalog(), db_id_, foreign_storage::Interval< T >::end, Catalog_Namespace::SysCatalog::instance(), schema_, and foreign_storage::Interval< T >::start.

Referenced by initializeChunkBuffers().

140  {
141  const auto catalog =
143  const auto& columns = schema_->getLogicalAndPhysicalColumns();
144  auto column_start = column_interval.start;
145  auto column_end = column_interval.end;
146  std::list<const ColumnDescriptor*> columns_to_init;
147  for (const auto column : columns) {
148  auto column_id = column->columnId;
149  if (column_id >= column_start && column_id <= column_end) {
150  columns_to_init.push_back(column);
151  }
152  }
153  return columns_to_init;
154 }
std::unique_ptr< ForeignTableSchema > schema_
static SysCatalog & instance()
Definition: SysCatalog.h:288
T const end
Definition: Intervals.h:68
std::shared_ptr< Catalog > checkedGetCatalog(const int32_t db_id)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::set< std::string > foreign_storage::ParquetDataWrapper::getProcessedFilePaths ( )
private

Definition at line 300 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_.

Referenced by fetchChunkMetadata().

300  {
301  std::set<std::string> file_paths;
302  for (const auto& entry : fragment_to_row_group_interval_map_) {
303  for (const auto& row_group_interval : entry.second) {
304  file_paths.emplace(row_group_interval.file_path);
305  }
306  }
307  return file_paths;
308 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::vector< std::string_view > foreign_storage::ParquetDataWrapper::getSupportedOptions ( )
static

Definition at line 115 of file ParquetDataWrapper.cpp.

References supported_options_.

115  {
116  return std::vector<std::string_view>{supported_options_.begin(),
117  supported_options_.end()};
118 }
static constexpr std::array< char const *, 0 > supported_options_
void foreign_storage::ParquetDataWrapper::initializeChunkBuffers ( const int  fragment_index,
const Interval< ColumnType > &  column_interval,
std::map< ChunkKey, AbstractBuffer * > &  required_buffers,
const bool  reserve_buffers_and_set_stats = false 
)
private

Definition at line 156 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, getColumnsToInitialize(), kENCODING_NONE, and TableDescriptor::tableId.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

160  {
161  for (const auto column : getColumnsToInitialize(column_interval)) {
162  Chunk_NS::Chunk chunk{column};
163  ChunkKey data_chunk_key;
164  if (column->columnType.is_varlen_indeed()) {
165  data_chunk_key = {
166  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
167  auto data_buffer = required_buffers[data_chunk_key];
168  CHECK(data_buffer);
169  chunk.setBuffer(data_buffer);
170 
171  ChunkKey index_chunk_key{
172  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
173  auto index_buffer = required_buffers[index_chunk_key];
174  CHECK(index_buffer);
175  chunk.setIndexBuffer(index_buffer);
176  } else {
177  data_chunk_key = {
178  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
179  auto data_buffer = required_buffers[data_chunk_key];
180  CHECK(data_buffer);
181  chunk.setBuffer(data_buffer);
182  }
183  chunk.initEncoder();
184  if (reserve_buffers_and_set_stats) {
185  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
186  CHECK(metadata_it != chunk_metadata_map_.end());
187  auto buffer = chunk.getBuffer();
188  auto& metadata = metadata_it->second;
189  auto encoder = buffer->getEncoder();
190  encoder->resetChunkStats(metadata->chunkStats);
191  encoder->setNumElems(metadata->numElements);
192  if (column->columnType.is_string() &&
193  column->columnType.get_compression() == kENCODING_NONE) {
194  auto index_buffer = chunk.getIndexBuf();
195  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
196  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
197  auto index_buffer = chunk.getIndexBuf();
198  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
199  } else {
200  size_t num_bytes_to_reserve =
201  metadata->numElements * column->columnType.get_size();
202  buffer->reserve(num_bytes_to_reserve);
203  }
204  }
205  }
206 }
std::vector< int > ChunkKey
Definition: types.h:37
int32_t StringOffsetT
Definition: sqltypes.h:919
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
int32_t ArrayOffsetT
Definition: sqltypes.h:920
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isNewFile ( const std::string &  file_path) const
private

Definition at line 225 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, and last_fragment_index_.

Referenced by metadataScanFiles().

225  {
226  const auto last_fragment_entry =
228  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
229 
230  // The entry for the first fragment starts out as an empty vector
231  if (last_fragment_entry->second.empty()) {
233  return true;
234  } else {
235  return (last_fragment_entry->second.back().file_path != file_path);
236  }
237 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 584 of file ParquetDataWrapper.cpp.

References is_restored_.

584  {
585  return is_restored_;
586 }
void foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader ( const int  logical_column_id,
const int  fragment_id,
std::map< ChunkKey, AbstractBuffer * > &  required_buffers 
)
private

Definition at line 421 of file ParquetDataWrapper.cpp.

References CHECK, Catalog_Namespace::SysCatalog::checkedGetCatalog(), chunk_metadata_map_, ColumnDescriptor::columnType, db_id_, foreign_storage::Interval< T >::end, file_system_, foreign_table_, fragment_to_row_group_interval_map_, TableDescriptor::fragmenter, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_elem_type(), SQLTypeInfo::get_physical_cols(), initializeChunkBuffers(), Catalog_Namespace::SysCatalog::instance(), SQLTypeInfo::is_array(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_geometry(), foreign_storage::LazyParquetChunkLoader::loadChunk(), schema_, foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

424  {
426  const ColumnDescriptor* logical_column =
427  schema_->getColumnDescriptor(logical_column_id);
428  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
429 
430  const Interval<ColumnType> column_interval = {
431  logical_column_id,
432  logical_column_id + logical_column->columnType.get_physical_cols()};
433  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
434 
435  const auto& row_group_intervals = fragment_to_row_group_interval_map_[fragment_id];
436 
437  const bool is_dictionary_encoded_string_column =
438  logical_column->columnType.is_dict_encoded_string() ||
439  (logical_column->columnType.is_array() &&
440  logical_column->columnType.get_elem_type().is_dict_encoded_string());
441 
442  StringDictionary* string_dictionary = nullptr;
443  if (is_dictionary_encoded_string_column) {
444  auto dict_descriptor = catalog->getMetadataForDictUnlocked(
445  logical_column->columnType.get_comp_param(), true);
446  CHECK(dict_descriptor);
447  string_dictionary = dict_descriptor->stringDict.get();
448  }
449 
450  std::list<Chunk_NS::Chunk> chunks;
451  for (int column_id = column_interval.start; column_id <= column_interval.end;
452  ++column_id) {
453  auto column_descriptor = schema_->getColumnDescriptor(column_id);
454  Chunk_NS::Chunk chunk{column_descriptor};
455  if (column_descriptor->columnType.is_varlen_indeed()) {
456  ChunkKey data_chunk_key = {
457  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
458  auto buffer = required_buffers[data_chunk_key];
459  CHECK(buffer);
460  chunk.setBuffer(buffer);
461  ChunkKey index_chunk_key = {
462  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
463  auto index_buffer = required_buffers[index_chunk_key];
464  CHECK(index_buffer);
465  chunk.setIndexBuffer(index_buffer);
466  } else {
467  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
468  auto buffer = required_buffers[chunk_key];
469  CHECK(buffer);
470  chunk.setBuffer(buffer);
471  }
472  chunks.emplace_back(chunk);
473  }
474 
475  LazyParquetChunkLoader chunk_loader(file_system_);
476  auto metadata = chunk_loader.loadChunk(
477  row_group_intervals, parquet_column_index, chunks, string_dictionary);
478  auto fragmenter = foreign_table_->fragmenter;
479  if (fragmenter) {
480  auto metadata_iter = metadata.begin();
481  for (int column_id = column_interval.start; column_id <= column_interval.end;
482  ++column_id, ++metadata_iter) {
483  auto column = schema_->getColumnDescriptor(column_id);
484  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
485  if (column->columnType.is_varlen_indeed()) {
486  data_chunk_key.emplace_back(1);
487  }
488  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
489  auto cached_metadata = chunk_metadata_map_[data_chunk_key];
490  auto updated_metadata = std::make_shared<ChunkMetadata>();
491  *updated_metadata = *cached_metadata;
492  // for certain types, update the metadata statistics
493  if (is_dictionary_encoded_string_column ||
494  logical_column->columnType.is_geometry()) {
495  CHECK(metadata_iter != metadata.end());
496  auto& chunk_metadata_ptr = *metadata_iter;
497  updated_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
498  updated_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
499  }
500  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
501  updated_metadata->numBytes = required_buffers[data_chunk_key]->size();
502  fragmenter->updateColumnChunkMetadata(column, fragment_id, updated_metadata);
503  }
504  }
505 }
std::vector< int > ChunkKey
Definition: types.h:37
std::unique_ptr< ForeignTableSchema > schema_
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
int get_physical_cols() const
Definition: sqltypes.h:332
static SysCatalog & instance()
Definition: SysCatalog.h:288
T const end
Definition: Intervals.h:68
specifies the content in-memory of a row in the column metadata table
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::shared_ptr< Catalog > checkedGetCatalog(const int32_t db_id)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:320
#define CHECK(condition)
Definition: Logger.h:197
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, std::map< ChunkKey, AbstractBuffer * > &required_buffers, const bool reserve_buffers_and_set_stats=false)
bool is_geometry() const
Definition: sqltypes.h:490
bool is_dict_encoded_string() const
Definition: sqltypes.h:512
SQLTypeInfo columnType
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:697
bool is_array() const
Definition: sqltypes.h:486

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanFiles ( const std::set< std::string > &  file_paths)
private

Definition at line 360 of file ParquetDataWrapper.cpp.

References addNewFile(), addNewFragment(), CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, file_system_, finalizeFragmentMap(), foreign_table_, isNewFile(), last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::LazyParquetChunkLoader::metadataScan(), moveToNextFragment(), foreign_storage::anonymous_namespace{ParquetDataWrapper.cpp}::reduce_metadata(), schema_, TableDescriptor::tableId, and total_row_count_.

Referenced by fetchChunkMetadata().

360  {
361  LazyParquetChunkLoader chunk_loader(file_system_);
362  auto row_group_metadata = chunk_loader.metadataScan(file_paths, *schema_);
363  auto column_interval =
364  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
365  schema_->getLogicalAndPhysicalColumns().back()->columnId};
366 
367  for (const auto& row_group_metadata_item : row_group_metadata) {
368  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
369  CHECK(static_cast<int>(column_chunk_metadata.size()) ==
370  schema_->numLogicalAndPhysicalColumns());
371  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
372  const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
373  int row_group = row_group_metadata_item.row_group_index;
374  const auto& file_path = row_group_metadata_item.file_path;
375  if (moveToNextFragment(import_row_count)) {
376  addNewFragment(row_group, file_path);
377  } else if (isNewFile(file_path)) {
378  CHECK_EQ(row_group, 0);
379  addNewFile(file_path);
380  }
381  last_row_group_ = row_group;
382 
383  for (int column_id = column_interval.start; column_id <= column_interval.end;
384  column_id++, column_chunk_metadata_iter++) {
385  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
386  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
387 
388  const auto& type_info = column_descriptor->columnType;
389  ChunkKey chunk_key{
391  ChunkKey data_chunk_key = chunk_key;
392  if (type_info.is_varlen_indeed()) {
393  data_chunk_key.emplace_back(1);
394  }
395  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
396  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
397  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
398  } else {
399  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
400  }
401  }
402  last_fragment_row_count_ += import_row_count;
403  total_row_count_ += import_row_count;
404  }
406 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
std::unique_ptr< ForeignTableSchema > schema_
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
void addNewFragment(int row_group, const std::string &file_path)
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::shared_ptr< arrow::fs::FileSystem > file_system_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::moveToNextFragment ( size_t  new_rows_count) const
private

Definition at line 408 of file ParquetDataWrapper.cpp.

References foreign_table_, last_fragment_row_count_, and TableDescriptor::maxFragRows.

Referenced by metadataScanFiles().

408  {
409  return (last_fragment_row_count_ + new_rows_count) >
410  static_cast<size_t>(foreign_table_->maxFragRows);
411 }

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkBuffers ( std::map< ChunkKey, AbstractBuffer * > &  required_buffers,
std::map< ChunkKey, AbstractBuffer * > &  optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 507 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, loadBuffersUsingLazyParquetChunkLoader(), and schema_.

509  {
510  CHECK(!required_buffers.empty());
511  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
512 
513  std::set<int> logical_column_ids;
514  for (const auto& [chunk_key, buffer] : required_buffers) {
515  CHECK_EQ(fragment_id, chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
516  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
517  const auto column_id =
518  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId;
519  logical_column_ids.emplace(column_id);
520  }
521 
522  for (const auto column_id : logical_column_ids) {
523  loadBuffersUsingLazyParquetChunkLoader(column_id, fragment_id, required_buffers);
524  }
525 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, std::map< ChunkKey, AbstractBuffer * > &required_buffers)
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
#define CHECK(condition)
Definition: Logger.h:197
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 413 of file ParquetDataWrapper.cpp.

References chunk_metadata_map_, and fetchChunkMetadata().

414  {
416  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
417  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
418  }
419 }
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::resetParquetMetadata ( )
private

Definition at line 129 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and total_row_count_.

Referenced by fetchChunkMetadata().

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 563 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, fragment_to_row_group_interval_map_, foreign_storage::json_utils::get_value_from_object(), is_restored_, last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::json_utils::read_from_file(), and total_row_count_.

565  {
566  auto d = json_utils::read_from_file(file_path);
567  CHECK(d.IsObject());
568 
570  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
572  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
574  d, last_fragment_row_count_, "last_fragment_row_count");
576 
577  CHECK(chunk_metadata_map_.empty());
578  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
579  chunk_metadata_map_[chunk_key] = chunk_metadata;
580  }
581  is_restored_ = true;
582 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
rapidjson::Document read_from_file(const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::serializeDataWrapperInternals ( const std::string &  file_path) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Parameters
file_path- location to save file to

Implements foreign_storage::ForeignDataWrapper.

Definition at line 543 of file ParquetDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, total_row_count_, and foreign_storage::json_utils::write_to_file().

544  {
545  rapidjson::Document d;
546  d.SetObject();
547 
550  "fragment_to_row_group_interval_map",
551  d.GetAllocator());
552  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
554  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
556  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
558  d, total_row_count_, "total_row_count", d.GetAllocator());
559 
560  json_utils::write_to_file(d, file_path);
561 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)

+ Here is the call graph for this function:

import_export::CopyParams foreign_storage::ParquetDataWrapper::validateAndGetCopyParams ( ) const
private

Definition at line 336 of file ParquetDataWrapper.cpp.

References import_export::CopyParams::file_type.

336  {
337  import_export::CopyParams copy_params{};
338  // The file_type argument is never utilized in the context of FSI,
339  // for completeness, set the file_type
340  copy_params.file_type = import_export::FileType::PARQUET;
341  return copy_params;
342 }
std::string foreign_storage::ParquetDataWrapper::validateAndGetStringWithLength ( const std::string &  option_name,
const size_t  expected_num_chars 
) const
private

Validates that the value of given table option has the expected number of characters. An exception is thrown if the number of characters do not match.

Parameters
option_name- name of table option whose value is validated and returned
expected_num_chars- expected number of characters for option value
Returns
value of the option if the number of characters match. Returns an empty string if table options do not contain provided option.

Definition at line 344 of file ParquetDataWrapper.cpp.

References foreign_table_, foreign_storage::OptionsContainer::options, and to_string().

346  {
347  if (auto it = foreign_table_->options.find(option_name);
348  it != foreign_table_->options.end()) {
349  if (it->second.length() != expected_num_chars) {
350  throw std::runtime_error{"Value of \"" + option_name +
351  "\" foreign table option has the wrong number of "
352  "characters. Expected " +
353  std::to_string(expected_num_chars) + " character(s)."};
354  }
355  return it->second;
356  }
357  return "";
358 }
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::validateFilePath ( ) const
private

Definition at line 120 of file ParquetDataWrapper.cpp.

References foreign_storage::ForeignTable::foreign_server, foreign_table_, foreign_storage::ForeignTable::getFullFilePath(), ddl_utils::IMPORT, foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::ForeignServer::STORAGE_TYPE_KEY, and ddl_utils::validate_allowed_file_path().

120  {
121  auto& server_options = foreign_table_->foreign_server->options;
122  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
126  }
127 }
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:45
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:613
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:47
std::string getFullFilePath() const
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
const ForeignServer * foreign_server
Definition: ForeignTable.h:63

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::validateOptions ( const ForeignTable foreign_table)
static

Definition at line 100 of file ParquetDataWrapper.cpp.

References foreign_storage::OptionsContainer::options, foreign_storage::ForeignTable::supported_options, and supported_options_.

100  {
101  for (const auto& entry : foreign_table->options) {
102  const auto& table_options = foreign_table->supported_options;
103  if (std::find(table_options.begin(), table_options.end(), entry.first) ==
104  table_options.end() &&
105  std::find(supported_options_.begin(), supported_options_.end(), entry.first) ==
106  supported_options_.end()) {
107  throw std::runtime_error{"Invalid foreign table option \"" + entry.first + "\"."};
108  }
109  }
110  ParquetDataWrapper data_wrapper{foreign_table};
111  data_wrapper.validateAndGetCopyParams();
112  data_wrapper.validateFilePath();
113 }
ParquetDataWrapper(const int db_id, const ForeignTable *foreign_table)
static constexpr std::array< char const *, 0 > supported_options_

Member Data Documentation

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::ParquetDataWrapper::chunk_metadata_map_
private
const int foreign_storage::ParquetDataWrapper::db_id_
private
std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetDataWrapper::file_system_
private
std::map<int, std::vector<RowGroupInterval> > foreign_storage::ParquetDataWrapper::fragment_to_row_group_interval_map_
private
bool foreign_storage::ParquetDataWrapper::is_restored_
private

Definition at line 109 of file ParquetDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

int foreign_storage::ParquetDataWrapper::last_fragment_index_
private
size_t foreign_storage::ParquetDataWrapper::last_fragment_row_count_
private
int foreign_storage::ParquetDataWrapper::last_row_group_
private
std::unique_ptr<ForeignTableSchema> foreign_storage::ParquetDataWrapper::schema_
private
constexpr std::array<char const*, 0> foreign_storage::ParquetDataWrapper::supported_options_ {}
staticprivate

Definition at line 113 of file ParquetDataWrapper.h.

Referenced by getSupportedOptions(), and validateOptions().

size_t foreign_storage::ParquetDataWrapper::total_row_count_
private

The documentation for this class was generated from the following files: