OmniSciDB  2e3a973ef4
foreign_storage::ParquetDataWrapper Class Reference

#include <ParquetDataWrapper.h>

+ Inheritance diagram for foreign_storage::ParquetDataWrapper:
+ Collaboration diagram for foreign_storage::ParquetDataWrapper:

Public Member Functions

 ParquetDataWrapper (const int db_id, const ForeignTable *foreign_table)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (std::map< ChunkKey, AbstractBuffer *> &required_buffers, std::map< ChunkKey, AbstractBuffer *> &optional_buffers) override
 
void serializeDataWrapperInternals (const std::string &file_path) const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 

Static Public Member Functions

static void validateOptions (const ForeignTable *foreign_table)
 
static std::vector< std::string_view > getSupportedOptions ()
 

Private Member Functions

 ParquetDataWrapper (const ForeignTable *foreign_table)
 
std::list< const ColumnDescriptor * > getColumnsToInitialize (const Interval< ColumnType > &column_interval)
 
void initializeChunkBuffers (const int fragment_index, const Interval< ColumnType > &column_interval, std::map< ChunkKey, AbstractBuffer *> &required_buffers, const bool reserve_buffers_and_set_stats=false)
 
void fetchChunkMetadata ()
 
void loadBuffersUsingLazyParquetImporter (const int logical_column_id, const int fragment_id, std::map< ChunkKey, AbstractBuffer *> &required_buffers)
 
void loadBuffersUsingLazyParquetChunkLoader (const int logical_column_id, const int fragment_id, std::map< ChunkKey, AbstractBuffer *> &required_buffers)
 
void validateFilePath () const
 
std::string getConfiguredFilePath () const
 
std::set< std::string > getProcessedFilePaths ()
 
std::set< std::string > getAllFilePaths ()
 
import_export::CopyParams validateAndGetCopyParams () const
 
std::string validateAndGetStringWithLength (const std::string &option_name, const size_t expected_num_chars) const
 
bool moveToNextFragment (size_t new_rows_count) const
 
void finalizeFragmentMap ()
 
void addNewFragment (int row_group, const std::string &file_path)
 
bool isNewFile (const std::string &file_path) const
 
void addNewFile (const std::string &file_path)
 
void resetParquetMetadata ()
 
void updateStatsForEncoder (Encoder *encoder, const SQLTypeInfo type_info, const DataBlockPtr &data_block, const size_t import_count)
 
void loadMetadataChunk (const ColumnDescriptor *column, const ChunkKey &chunk_key, DataBlockPtr &data_block, const size_t import_count, const bool has_nulls, const bool is_all_nulls, const ArrayMetadataStats &array_stats)
 
void loadChunk (const ColumnDescriptor *column, const ChunkKey &chunk_key, DataBlockPtr &data_block, const size_t import_count, std::map< ChunkKey, AbstractBuffer *> &required_buffers)
 
import_export::LoadergetMetadataLoader (Catalog_Namespace::Catalog &catalog, const ParquetLoaderMetadata &parquet_loader_metadata)
 
import_export::LoadergetChunkLoader (Catalog_Namespace::Catalog &catalog, const Interval< ColumnType > &column_interval, const int db_id, const int fragment_index, std::map< ChunkKey, AbstractBuffer *> &required_buffers)
 

Private Attributes

std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
 
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
 
const int db_id_
 
const ForeignTableforeign_table_
 
int last_fragment_index_
 
size_t last_fragment_row_count_
 
size_t total_row_count_
 
int last_row_group_
 
bool is_restored_
 
std::unique_ptr< ForeignTableSchemaschema_
 
std::shared_ptr< arrow::fs::FileSystem > file_system_
 

Static Private Attributes

static constexpr std::array< char const *, 4 > supported_options_
 

Detailed Description

Definition at line 33 of file ParquetDataWrapper.h.

Constructor & Destructor Documentation

◆ ParquetDataWrapper() [1/2]

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const int  db_id,
const ForeignTable foreign_table 
)

Definition at line 46 of file ParquetDataWrapper.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::ForeignServer::STORAGE_TYPE_KEY, and UNREACHABLE.

47  : db_id_(db_id)
48  , foreign_table_(foreign_table)
51  , total_row_count_(0)
52  , last_row_group_(0)
53  , is_restored_(false)
54  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table)) {
55  auto& server_options = foreign_table->foreign_server->options;
56  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
58  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
59  } else {
60  UNREACHABLE();
61  }
62 }
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:241
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
std::shared_ptr< arrow::fs::FileSystem > file_system_
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45

◆ ParquetDataWrapper() [2/2]

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const ForeignTable foreign_table)
private

Definition at line 64 of file ParquetDataWrapper.cpp.

65  : db_id_(-1), foreign_table_(foreign_table) {}

Member Function Documentation

◆ addNewFile()

void foreign_storage::ParquetDataWrapper::addNewFile ( const std::string &  file_path)
private

Definition at line 205 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by getMetadataLoader().

205  {
206  const auto last_fragment_entry =
208  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
209 
210  // The entry for the first fragment starts out as an empty vector
211  if (last_fragment_entry->second.empty()) {
213  } else {
214  last_fragment_entry->second.back().end_index = last_row_group_;
215  }
216  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
217 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ addNewFragment()

void foreign_storage::ParquetDataWrapper::addNewFragment ( int  row_group,
const std::string &  file_path 
)
private

Definition at line 179 of file ParquetDataWrapper.cpp.

References CHECK, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, and last_row_group_.

Referenced by getMetadataLoader().

179  {
180  const auto last_fragment_entry =
182  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
183 
184  last_fragment_entry->second.back().end_index = last_row_group_;
188  RowGroupInterval{file_path, row_group});
189 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ fetchChunkMetadata()

void foreign_storage::ParquetDataWrapper::fetchChunkMetadata ( )
private

Definition at line 219 of file ParquetDataWrapper.cpp.

References CHECK_EQ, Catalog_Namespace::Catalog::checkedGet(), chunk_metadata_map_, db_id_, file_system_, finalizeFragmentMap(), foreign_table_, getAllFilePaths(), getMetadataLoader(), getProcessedFilePaths(), foreign_storage::ForeignTable::isAppendMode(), foreign_storage::LazyParquetImporter::metadataScan(), foreign_storage::open_parquet_table(), resetParquetMetadata(), schema_, foreign_storage::throw_removed_file_error(), foreign_storage::throw_removed_row_error(), total_row_count_, and validateAndGetCopyParams().

Referenced by populateChunkMetadata().

219  {
221  std::set<std::string> new_file_paths;
222  auto processed_file_paths = getProcessedFilePaths();
223  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
224  auto all_file_paths = getAllFilePaths();
225  for (const auto& file_path : processed_file_paths) {
226  if (all_file_paths.find(file_path) == all_file_paths.end()) {
227  throw_removed_file_error(file_path);
228  }
229  }
230 
231  for (const auto& file_path : all_file_paths) {
232  if (processed_file_paths.find(file_path) == processed_file_paths.end()) {
233  new_file_paths.emplace(file_path);
234  }
235  }
236 
237  // Single file append
238  if (new_file_paths.empty() && all_file_paths.size() == 1) {
239  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
240  const auto& file_path = *all_file_paths.begin();
241  CHECK_EQ(*processed_file_paths.begin(), file_path);
242 
243  std::unique_ptr<parquet::arrow::FileReader> reader;
244  open_parquet_table(file_path, reader, file_system_);
245  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
246 
247  if (row_count < total_row_count_) {
248  throw_removed_row_error(file_path);
249  } else if (row_count > total_row_count_) {
250  new_file_paths = all_file_paths;
251  chunk_metadata_map_.clear();
253  }
254  }
255  } else {
256  new_file_paths = getAllFilePaths();
257  chunk_metadata_map_.clear();
259  }
260 
261  if (!new_file_paths.empty()) {
262  ParquetLoaderMetadata parquet_loader_metadata;
263  LazyParquetImporter importer(getMetadataLoader(*catalog, parquet_loader_metadata),
264  new_file_paths,
265  file_system_,
267  parquet_loader_metadata,
268  *schema_);
269  importer.metadataScan();
271  }
272 }
import_export::Loader * getMetadataLoader(Catalog_Namespace::Catalog &catalog, const ParquetLoaderMetadata &parquet_loader_metadata)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::set< std::string > getProcessedFilePaths()
std::set< std::string > getAllFilePaths()
std::unique_ptr< ForeignTableSchema > schema_
void throw_removed_row_error(const std::string &file_path)
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3770
std::shared_ptr< arrow::fs::FileSystem > file_system_
void open_parquet_table(const std::string &file_path, std::unique_ptr< parquet::arrow::FileReader > &reader, std::shared_ptr< arrow::fs::FileSystem > &file_system)
import_export::CopyParams validateAndGetCopyParams() const
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ finalizeFragmentMap()

void foreign_storage::ParquetDataWrapper::finalizeFragmentMap ( )
private

Definition at line 174 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by fetchChunkMetadata().

174  {
177 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
+ Here is the caller graph for this function:

◆ getAllFilePaths()

std::set< std::string > foreign_storage::ParquetDataWrapper::getAllFilePaths ( )
private

Definition at line 308 of file ParquetDataWrapper.cpp.

References DEBUG_TIMER, file_system_, and getConfiguredFilePath().

Referenced by fetchChunkMetadata().

308  {
309  auto timer = DEBUG_TIMER(__func__);
310  std::set<std::string> file_paths;
311  arrow::fs::FileSelector file_selector{};
312  std::string base_path = getConfiguredFilePath();
313  file_selector.base_dir = base_path;
314  file_selector.recursive = true;
315 
316  auto file_info_result = file_system_->GetFileInfo(file_selector);
317  if (!file_info_result.ok()) {
318  // This is expected when `base_path` points to a single file.
319  file_paths.emplace(base_path);
320  } else {
321  auto& file_info_vector = file_info_result.ValueOrDie();
322  for (const auto& file_info : file_info_vector) {
323  if (file_info.type() == arrow::fs::FileType::File) {
324  file_paths.emplace(file_info.path());
325  }
326  }
327  if (file_paths.empty()) {
328  throw std::runtime_error{"No file found at given path \"" + base_path + "\"."};
329  }
330  }
331  return file_paths;
332 }
std::shared_ptr< arrow::fs::FileSystem > file_system_
#define DEBUG_TIMER(name)
Definition: Logger.h:313
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getChunkLoader()

import_export::Loader * foreign_storage::ParquetDataWrapper::getChunkLoader ( Catalog_Namespace::Catalog catalog,
const Interval< ColumnType > &  column_interval,
const int  db_id,
const int  fragment_index,
std::map< ChunkKey, AbstractBuffer *> &  required_buffers 
)
private

Definition at line 496 of file ParquetDataWrapper.cpp.

References foreign_storage::Interval< T >::end, foreign_table_, loadChunk(), foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by loadBuffersUsingLazyParquetImporter().

501  {
502  auto callback =
503  [this, column_interval, db_id, fragment_index, &required_buffers](
504  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
505  import_buffers,
506  std::vector<DataBlockPtr>& data_blocks,
507  size_t import_row_count) {
508  for (int column_id = column_interval.start; column_id <= column_interval.end;
509  column_id++) {
510  // Column ids start at 1, hence the -1 offset
511  auto& import_buffer = import_buffers[column_id - 1];
512  ChunkKey chunk_key{db_id, foreign_table_->tableId, column_id, fragment_index};
513  loadChunk(import_buffer->getColumnDesc(),
514  chunk_key,
515  data_blocks[column_id - 1],
516  import_row_count,
517  required_buffers);
518  }
519  return true;
520  };
521 
522  return new import_export::Loader(catalog, foreign_table_, callback, false);
523 }
T const end
Definition: Intervals.h:67
void loadChunk(const ColumnDescriptor *column, const ChunkKey &chunk_key, DataBlockPtr &data_block, const size_t import_count, std::map< ChunkKey, AbstractBuffer *> &required_buffers)
std::vector< int > ChunkKey
Definition: types.h:37
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getColumnsToInitialize()

std::list< const ColumnDescriptor * > foreign_storage::ParquetDataWrapper::getColumnsToInitialize ( const Interval< ColumnType > &  column_interval)
private

Definition at line 106 of file ParquetDataWrapper.cpp.

References Catalog_Namespace::Catalog::checkedGet(), db_id_, foreign_storage::Interval< T >::end, schema_, and foreign_storage::Interval< T >::start.

Referenced by initializeChunkBuffers().

107  {
108  const auto catalog = Catalog_Namespace::Catalog::checkedGet(db_id_);
109  const auto& columns = schema_->getLogicalAndPhysicalColumns();
110  auto column_start = column_interval.start;
111  auto column_end = column_interval.end;
112  std::list<const ColumnDescriptor*> columns_to_init;
113  for (const auto column : columns) {
114  auto column_id = column->columnId;
115  if (column_id >= column_start && column_id <= column_end) {
116  columns_to_init.push_back(column);
117  }
118  }
119  return columns_to_init;
120 }
std::unique_ptr< ForeignTableSchema > schema_
T const end
Definition: Intervals.h:67
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3770
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getConfiguredFilePath()

std::string foreign_storage::ParquetDataWrapper::getConfiguredFilePath ( ) const
private

Definition at line 274 of file ParquetDataWrapper.cpp.

References foreign_storage::ForeignServer::BASE_PATH_KEY, foreign_storage::ForeignTable::foreign_server, foreign_table_, foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::ForeignServer::STORAGE_TYPE_KEY, and UNREACHABLE.

Referenced by getAllFilePaths(), and validateFilePath().

274  {
275  auto& server_options = foreign_table_->foreign_server->options;
276  std::string base_path;
277  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
279  auto base_path_entry = server_options.find(ForeignServer::BASE_PATH_KEY);
280  if (base_path_entry == server_options.end()) {
281  throw std::runtime_error{"No base path found in foreign server options."};
282  }
283  base_path = base_path_entry->second;
284  } else {
285  UNREACHABLE();
286  }
287 
288  auto file_path_entry = foreign_table_->options.find("FILE_PATH");
289  std::string file_path{};
290  if (file_path_entry != foreign_table_->options.end()) {
291  file_path = file_path_entry->second;
292  }
293  const std::string separator{boost::filesystem::path::preferred_separator};
294  return std::regex_replace(
295  base_path + separator + file_path, std::regex{separator + "{2,}"}, separator);
296 }
std::map< std::string, std::string, std::less<> > options
#define UNREACHABLE()
Definition: Logger.h:241
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
const ForeignServer * foreign_server
Definition: ForeignTable.h:39
static constexpr std::string_view BASE_PATH_KEY
Definition: ForeignServer.h:44
+ Here is the caller graph for this function:

◆ getMetadataLoader()

import_export::Loader * foreign_storage::ParquetDataWrapper::getMetadataLoader ( Catalog_Namespace::Catalog catalog,
const ParquetLoaderMetadata parquet_loader_metadata 
)
private

Definition at line 525 of file ParquetDataWrapper.cpp.

References addNewFile(), addNewFragment(), CHECK, CHECK_EQ, db_id_, foreign_storage::ParquetLoaderMetadata::file_path, foreign_table_, isNewFile(), last_fragment_index_, last_fragment_row_count_, last_row_group_, loadMetadataChunk(), moveToNextFragment(), foreign_storage::ParquetLoaderMetadata::row_group_metadata_vector, TableDescriptor::tableId, and total_row_count_.

Referenced by fetchChunkMetadata().

527  {
528  auto callback =
529  [this, &parquet_loader_metadata](
530  const std::vector<std::unique_ptr<import_export::TypedImportBuffer>>&
531  import_buffers,
532  std::vector<DataBlockPtr>& data_blocks,
533  size_t import_row_count) {
534  CHECK(!parquet_loader_metadata.row_group_metadata_vector.empty());
535  int row_group =
536  parquet_loader_metadata.row_group_metadata_vector[0].row_group_index;
537  if (moveToNextFragment(import_row_count)) {
538  addNewFragment(row_group, parquet_loader_metadata.file_path);
539  } else if (isNewFile(parquet_loader_metadata.file_path)) {
540  CHECK_EQ(row_group, 0);
541  addNewFile(parquet_loader_metadata.file_path);
542  }
543  last_row_group_ = row_group;
544 
545  for (size_t i = 0; i < import_buffers.size(); i++) {
546  auto& import_buffer = import_buffers[i];
547  const auto column = import_buffer->getColumnDesc();
548  auto column_id = column->columnId;
549  ChunkKey chunk_key{
551  const auto& metadata = parquet_loader_metadata.row_group_metadata_vector[i];
552  CHECK(metadata.row_group_index == row_group);
553  CHECK(metadata.metadata_only);
554  loadMetadataChunk(column,
555  chunk_key,
556  data_blocks[i],
557  metadata.num_elements,
558  metadata.has_nulls,
559  metadata.is_all_nulls,
560  metadata.array_stats);
561  }
562 
563  last_fragment_row_count_ += import_row_count;
564  total_row_count_ += import_row_count;
565  return true;
566  };
567 
568  return new import_export::Loader(catalog, foreign_table_, callback, false);
569 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
bool isNewFile(const std::string &file_path) const
void addNewFile(const std::string &file_path)
void addNewFragment(int row_group, const std::string &file_path)
void loadMetadataChunk(const ColumnDescriptor *column, const ChunkKey &chunk_key, DataBlockPtr &data_block, const size_t import_count, const bool has_nulls, const bool is_all_nulls, const ArrayMetadataStats &array_stats)
bool moveToNextFragment(size_t new_rows_count) const
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getProcessedFilePaths()

std::set< std::string > foreign_storage::ParquetDataWrapper::getProcessedFilePaths ( )
private

Definition at line 298 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_.

Referenced by fetchChunkMetadata().

298  {
299  std::set<std::string> file_paths;
300  for (const auto& entry : fragment_to_row_group_interval_map_) {
301  for (const auto& row_group_interval : entry.second) {
302  file_paths.emplace(row_group_interval.file_path);
303  }
304  }
305  return file_paths;
306 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
+ Here is the caller graph for this function:

◆ getSupportedOptions()

std::vector< std::string_view > foreign_storage::ParquetDataWrapper::getSupportedOptions ( )
static

Definition at line 82 of file ParquetDataWrapper.cpp.

References supported_options_.

Referenced by CreateForeignTableCommand::setTableDetails().

82  {
83  return std::vector<std::string_view>{supported_options_.begin(),
84  supported_options_.end()};
85 }
static constexpr std::array< char const *, 4 > supported_options_
+ Here is the caller graph for this function:

◆ initializeChunkBuffers()

void foreign_storage::ParquetDataWrapper::initializeChunkBuffers ( const int  fragment_index,
const Interval< ColumnType > &  column_interval,
std::map< ChunkKey, AbstractBuffer *> &  required_buffers,
const bool  reserve_buffers_and_set_stats = false 
)
private

Definition at line 122 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, getColumnsToInitialize(), kENCODING_NONE, and TableDescriptor::tableId.

Referenced by loadBuffersUsingLazyParquetChunkLoader(), and loadBuffersUsingLazyParquetImporter().

126  {
127  for (const auto column : getColumnsToInitialize(column_interval)) {
128  Chunk_NS::Chunk chunk{column};
129  ChunkKey data_chunk_key;
130  if (column->columnType.is_varlen_indeed()) {
131  data_chunk_key = {
132  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
133  auto data_buffer = required_buffers[data_chunk_key];
134  CHECK(data_buffer);
135  chunk.setBuffer(data_buffer);
136 
137  ChunkKey index_chunk_key{
138  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
139  auto index_buffer = required_buffers[index_chunk_key];
140  CHECK(index_buffer);
141  chunk.setIndexBuffer(index_buffer);
142  } else {
143  data_chunk_key = {
144  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
145  auto data_buffer = required_buffers[data_chunk_key];
146  CHECK(data_buffer);
147  chunk.setBuffer(data_buffer);
148  }
149  chunk.initEncoder();
150  if (reserve_buffers_and_set_stats) {
151  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
152  CHECK(metadata_it != chunk_metadata_map_.end());
153  auto buffer = chunk.getBuffer();
154  auto& metadata = metadata_it->second;
155  auto encoder = buffer->getEncoder();
156  encoder->resetChunkStats(metadata->chunkStats);
157  encoder->setNumElems(metadata->numElements);
158  if (column->columnType.is_string() &&
159  column->columnType.get_compression() == kENCODING_NONE) {
160  auto index_buffer = chunk.getIndexBuf();
161  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
162  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
163  auto index_buffer = chunk.getIndexBuf();
164  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
165  } else {
166  size_t num_bytes_to_reserve =
167  metadata->numElements * column->columnType.get_size();
168  buffer->reserve(num_bytes_to_reserve);
169  }
170  }
171  }
172 }
int32_t StringOffsetT
Definition: sqltypes.h:868
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
int32_t ArrayOffsetT
Definition: sqltypes.h:869
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ isNewFile()

bool foreign_storage::ParquetDataWrapper::isNewFile ( const std::string &  file_path) const
private

Definition at line 191 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, and last_fragment_index_.

Referenced by getMetadataLoader().

191  {
192  const auto last_fragment_entry =
194  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
195 
196  // The entry for the first fragment starts out as an empty vector
197  if (last_fragment_entry->second.empty()) {
199  return true;
200  } else {
201  return (last_fragment_entry->second.back().file_path != file_path);
202  }
203 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ isRestored()

bool foreign_storage::ParquetDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 803 of file ParquetDataWrapper.cpp.

References is_restored_.

803  {
804  return is_restored_;
805 }

◆ loadBuffersUsingLazyParquetChunkLoader()

void foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader ( const int  logical_column_id,
const int  fragment_id,
std::map< ChunkKey, AbstractBuffer *> &  required_buffers 
)
private

Definition at line 584 of file ParquetDataWrapper.cpp.

References CHECK, Catalog_Namespace::Catalog::checkedGet(), chunk_metadata_map_, ColumnDescriptor::columnType, db_id_, foreign_storage::Interval< T >::end, file_system_, foreign_table_, fragment_to_row_group_interval_map_, TableDescriptor::fragmenter, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_elem_type(), SQLTypeInfo::get_physical_cols(), initializeChunkBuffers(), SQLTypeInfo::is_array(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_geometry(), foreign_storage::LazyParquetChunkLoader::loadChunk(), schema_, foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

587  {
589  const ColumnDescriptor* logical_column =
590  schema_->getColumnDescriptor(logical_column_id);
591  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
592 
593  const Interval<ColumnType> column_interval = {
594  logical_column_id,
595  logical_column_id + logical_column->columnType.get_physical_cols()};
596  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
597 
598  const auto& row_group_intervals = fragment_to_row_group_interval_map_[fragment_id];
599 
600  const bool is_dictionary_encoded_string_column =
601  logical_column->columnType.is_dict_encoded_string() ||
602  (logical_column->columnType.is_array() &&
603  logical_column->columnType.get_elem_type().is_dict_encoded_string());
604 
605  StringDictionary* string_dictionary = nullptr;
606  if (is_dictionary_encoded_string_column) {
607  auto dict_descriptor = catalog->getMetadataForDictUnlocked(
608  logical_column->columnType.get_comp_param(), true);
609  CHECK(dict_descriptor);
610  string_dictionary = dict_descriptor->stringDict.get();
611  }
612 
613  std::list<Chunk_NS::Chunk> chunks;
614  for (int column_id = column_interval.start; column_id <= column_interval.end;
615  ++column_id) {
616  auto column_descriptor = schema_->getColumnDescriptor(column_id);
617  Chunk_NS::Chunk chunk{column_descriptor};
618  if (column_descriptor->columnType.is_varlen_indeed()) {
619  ChunkKey data_chunk_key = {
620  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
621  auto buffer = required_buffers[data_chunk_key];
622  CHECK(buffer);
623  chunk.setBuffer(buffer);
624  ChunkKey index_chunk_key = {
625  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
626  auto index_buffer = required_buffers[index_chunk_key];
627  CHECK(index_buffer);
628  chunk.setIndexBuffer(index_buffer);
629  } else {
630  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
631  auto buffer = required_buffers[chunk_key];
632  CHECK(buffer);
633  chunk.setBuffer(buffer);
634  }
635  chunks.emplace_back(chunk);
636  }
637 
638  LazyParquetChunkLoader chunk_loader(file_system_);
639  auto metadata = chunk_loader.loadChunk(
640  row_group_intervals, parquet_column_index, chunks, string_dictionary);
641  auto fragmenter = foreign_table_->fragmenter;
642  if (fragmenter) {
643  auto metadata_iter = metadata.begin();
644  for (int column_id = column_interval.start; column_id <= column_interval.end;
645  ++column_id, ++metadata_iter) {
646  auto column = schema_->getColumnDescriptor(column_id);
647  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
648  if (column->columnType.is_varlen_indeed()) {
649  data_chunk_key.emplace_back(1);
650  }
651  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
652  auto cached_metadata = chunk_metadata_map_[data_chunk_key];
653  auto updated_metadata = std::make_shared<ChunkMetadata>();
654  *updated_metadata = *cached_metadata;
655  // for certain types, update the metadata statistics
656  if (is_dictionary_encoded_string_column ||
657  logical_column->columnType.is_geometry()) {
658  CHECK(metadata_iter != metadata.end());
659  auto& chunk_metadata_ptr = *metadata_iter;
660  updated_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
661  updated_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
662  }
663  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
664  updated_metadata->numBytes = required_buffers[data_chunk_key]->size();
665  fragmenter->updateColumnChunkMetadata(column, fragment_id, updated_metadata);
666  }
667  }
668 }
bool is_array() const
Definition: sqltypes.h:425
std::unique_ptr< ForeignTableSchema > schema_
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:268
T const end
Definition: Intervals.h:67
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
bool is_dict_encoded_string() const
Definition: sqltypes.h:444
int get_physical_cols() const
Definition: sqltypes.h:280
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3770
specifies the content in-memory of a row in the column metadata table
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, std::map< ChunkKey, AbstractBuffer *> &required_buffers, const bool reserve_buffers_and_set_stats=false)
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool is_geometry() const
Definition: sqltypes.h:429
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:624
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ loadBuffersUsingLazyParquetImporter()

void foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetImporter ( const int  logical_column_id,
const int  fragment_id,
std::map< ChunkKey, AbstractBuffer *> &  required_buffers 
)
private

Definition at line 670 of file ParquetDataWrapper.cpp.

References Catalog_Namespace::Catalog::checkedGet(), ColumnDescriptor::columnId, ColumnDescriptor::columnType, db_id_, file_system_, fragment_to_row_group_interval_map_, SQLTypeInfo::get_physical_cols(), getChunkLoader(), initializeChunkBuffers(), schema_, and validateAndGetCopyParams().

Referenced by populateChunkBuffers().

673  {
675  const ColumnDescriptor* logical_column =
676  schema_->getColumnDescriptor(logical_column_id);
677  const Interval<ColumnType> column_interval = {
678  logical_column->columnId,
679  logical_column->columnId + logical_column->columnType.get_physical_cols()};
680  initializeChunkBuffers(fragment_id, column_interval, required_buffers);
681 
682  ParquetLoaderMetadata parquet_loader_metadata;
683  LazyParquetImporter importer(
684  getChunkLoader(*catalog, column_interval, db_id_, fragment_id, required_buffers),
685  {},
686  file_system_,
688  parquet_loader_metadata,
689  *schema_);
690  const auto& row_group_intervals = fragment_to_row_group_interval_map_[fragment_id];
691  importer.partialImport(row_group_intervals, column_interval);
692 }
std::unique_ptr< ForeignTableSchema > schema_
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
int get_physical_cols() const
Definition: sqltypes.h:280
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3770
specifies the content in-memory of a row in the column metadata table
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, std::map< ChunkKey, AbstractBuffer *> &required_buffers, const bool reserve_buffers_and_set_stats=false)
std::shared_ptr< arrow::fs::FileSystem > file_system_
import_export::Loader * getChunkLoader(Catalog_Namespace::Catalog &catalog, const Interval< ColumnType > &column_interval, const int db_id, const int fragment_index, std::map< ChunkKey, AbstractBuffer *> &required_buffers)
import_export::CopyParams validateAndGetCopyParams() const
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ loadChunk()

void foreign_storage::ParquetDataWrapper::loadChunk ( const ColumnDescriptor column,
const ChunkKey chunk_key,
DataBlockPtr data_block,
const size_t  import_count,
std::map< ChunkKey, AbstractBuffer *> &  required_buffers 
)
private

Definition at line 462 of file ParquetDataWrapper.cpp.

References CHECK, CHUNK_KEY_COLUMN_IDX, ColumnDescriptor::columnId, and ColumnDescriptor::columnType.

Referenced by getChunkLoader().

467  {
468  Chunk_NS::Chunk chunk{column};
469  auto column_id = column->columnId;
470  CHECK(column_id == chunk_key[CHUNK_KEY_COLUMN_IDX]);
471  auto& type_info = column->columnType;
472  if (type_info.is_varlen_indeed()) {
473  ChunkKey data_chunk_key{chunk_key};
474  data_chunk_key.resize(5);
475  data_chunk_key[4] = 1;
476  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
477  auto& data_buffer = required_buffers[data_chunk_key];
478  chunk.setBuffer(data_buffer);
479 
480  ChunkKey index_chunk_key{chunk_key};
481  index_chunk_key.resize(5);
482  index_chunk_key[4] = 2;
483  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
484  auto& index_buffer = required_buffers[index_chunk_key];
485  chunk.setIndexBuffer(index_buffer);
486  } else {
487  CHECK(required_buffers.find(chunk_key) != required_buffers.end());
488  auto& buffer = required_buffers[chunk_key];
489  chunk.setBuffer(buffer);
490  }
491  chunk.appendData(data_block, import_count, 0);
492  chunk.setBuffer(nullptr);
493  chunk.setIndexBuffer(nullptr);
494 }
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
SQLTypeInfo columnType
+ Here is the caller graph for this function:

◆ loadMetadataChunk()

void foreign_storage::ParquetDataWrapper::loadMetadataChunk ( const ColumnDescriptor column,
const ChunkKey chunk_key,
DataBlockPtr data_block,
const size_t  import_count,
const bool  has_nulls,
const bool  is_all_nulls,
const ArrayMetadataStats array_stats 
)
private

Definition at line 397 of file ParquetDataWrapper.cpp.

References CHUNK_KEY_COLUMN_IDX, chunk_metadata_map_, ColumnDescriptor::columnType, Data_Namespace::AbstractBuffer::getEncoder(), foreign_storage::ArrayMetadataStats::getMax(), foreign_storage::ArrayMetadataStats::getMin(), ChunkStats::has_nulls, Data_Namespace::AbstractBuffer::initEncoder(), ChunkStats::max, ChunkStats::min, Encoder::resetChunkStats(), schema_, Data_Namespace::AbstractBuffer::setSize(), foreign_storage::ArrayMetadataStats::updateStats(), and updateStatsForEncoder().

Referenced by getMetadataLoader().

403  {
404  auto type_info = column->columnType;
405  ChunkKey data_chunk_key = chunk_key;
406  if (type_info.is_varlen_indeed()) {
407  data_chunk_key.emplace_back(1);
408  }
409  ForeignStorageBuffer buffer;
410  buffer.initEncoder(type_info);
411  auto encoder = buffer.getEncoder();
412  if (chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end()) {
413  encoder->resetChunkStats(chunk_metadata_map_[data_chunk_key]->chunkStats);
414  encoder->setNumElems(chunk_metadata_map_[data_chunk_key]->numElements);
415  buffer.setSize(chunk_metadata_map_[data_chunk_key]->numBytes);
416  } else {
417  chunk_metadata_map_[data_chunk_key] = std::make_shared<ChunkMetadata>();
418  if (type_info.is_array()) {
419  // ChunkStats for arrays must be manually initialized as the encoders do
420  // not initialize them
421  ArrayMetadataStats array_stats_local;
422  auto sub_type_info = type_info.get_elem_type();
423  chunk_metadata_map_[data_chunk_key]->fillChunkStats(
424  array_stats_local.getMin(sub_type_info),
425  array_stats_local.getMax(sub_type_info),
426  false);
427  }
428  }
429 
430  auto logical_type_info =
431  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnType;
432  if (is_all_nulls || logical_type_info.is_string() || logical_type_info.is_varlen()) {
433  // Do not attempt to load min/max statistics if entire row group is null or
434  // if the column is a string or variable length column
435  encoder->setNumElems(encoder->getNumElems() + import_count);
436  } else {
437  // Loads min/max statistics for columns with this information
438  updateStatsForEncoder(encoder, type_info, data_block, 2);
439  encoder->setNumElems(encoder->getNumElems() + import_count - 2);
440  }
441  if (type_info.is_array()) {
442  // For array types, resetChunkStats is not implemented, so we must fill
443  // metadata manually
444  auto chunk_metadata = chunk_metadata_map_[data_chunk_key];
445  ChunkStats saved_chunk_stats =
446  chunk_metadata
447  ->chunkStats; // save a copy of chunk stats before they are clobbered
448  encoder->getMetadata(chunk_metadata);
449  auto sub_type_info = type_info.get_elem_type();
450  auto array_stats_local = array_stats;
451  array_stats_local.updateStats(
452  sub_type_info, saved_chunk_stats.min, saved_chunk_stats.max);
453  chunk_metadata->fillChunkStats(array_stats_local.getMin(sub_type_info),
454  array_stats_local.getMax(sub_type_info),
455  saved_chunk_stats.has_nulls);
456  } else {
457  encoder->getMetadata(chunk_metadata_map_[data_chunk_key]);
458  }
459  chunk_metadata_map_[data_chunk_key]->chunkStats.has_nulls |= has_nulls;
460 }
std::unique_ptr< ForeignTableSchema > schema_
void updateStatsForEncoder(Encoder *encoder, const SQLTypeInfo type_info, const DataBlockPtr &data_block, const size_t import_count)
bool has_nulls
Definition: ChunkMetadata.h:28
void initEncoder(const SQLTypeInfo &tmp_sql_type)
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:212
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void setSize(const size_t size)
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
std::vector< int > ChunkKey
Definition: types.h:37
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ moveToNextFragment()

bool foreign_storage::ParquetDataWrapper::moveToNextFragment ( size_t  new_rows_count) const
private

Definition at line 571 of file ParquetDataWrapper.cpp.

References foreign_table_, last_fragment_row_count_, and TableDescriptor::maxFragRows.

Referenced by getMetadataLoader().

571  {
572  return (last_fragment_row_count_ + new_rows_count) >
573  static_cast<size_t>(foreign_table_->maxFragRows);
574 }
+ Here is the caller graph for this function:

◆ populateChunkBuffers()

void foreign_storage::ParquetDataWrapper::populateChunkBuffers ( std::map< ChunkKey, AbstractBuffer *> &  required_buffers,
std::map< ChunkKey, AbstractBuffer *> &  optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 694 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, ColumnDescriptor::columnType, file_system_, fragment_to_row_group_interval_map_, foreign_storage::get_column_descriptor(), SQLTypeInfo::get_type_name(), SQLTypeInfo::is_array(), foreign_storage::LazyParquetChunkLoader::isColumnMappingSupported(), loadBuffersUsingLazyParquetChunkLoader(), loadBuffersUsingLazyParquetImporter(), foreign_storage::open_parquet_table(), and schema_.

696  {
697  CHECK(!required_buffers.empty());
698  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
699 
700  std::set<int> logical_column_ids;
701  for (const auto& [chunk_key, buffer] : required_buffers) {
702  CHECK_EQ(fragment_id, chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
703  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
704  const auto column_id =
705  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId;
706  logical_column_ids.emplace(column_id);
707  }
708 
709  // Use metadata from the first file for the
710  // LazyParquetChunkLoader::isColumnMappingSupported() check. This should no longer be
711  // needed when all Parquet encoders are implemented.
715  const std::string& first_file_path =
716  fragment_to_row_group_interval_map_[0].front().file_path;
717  std::unique_ptr<parquet::arrow::FileReader> reader;
718  open_parquet_table(first_file_path, reader, file_system_);
719  for (const auto column_id : logical_column_ids) {
720  const ColumnDescriptor* column_descriptor = schema_->getColumnDescriptor(column_id);
721  auto parquet_column_index = schema_->getParquetColumnIndex(column_id);
723  column_descriptor,
724  get_column_descriptor(reader.get(), parquet_column_index))) {
725  loadBuffersUsingLazyParquetChunkLoader(column_id, fragment_id, required_buffers);
726  } else {
727  if (column_descriptor->columnType
728  .is_array()) { // arrays are not supported in LazyParquetImporter
729  auto parquet_column = get_column_descriptor(reader.get(), parquet_column_index);
730  std::string parquet_type;
731  if (parquet_column->logical_type()->is_none()) {
732  parquet_type = parquet::TypeToString(parquet_column->physical_type());
733  } else {
734  parquet_type = parquet_column->logical_type()->ToString();
735  }
736  throw std::runtime_error{
737  "Conversion from Parquet type \"" + parquet_type + "\" to OmniSci type \"" +
738  column_descriptor->columnType.get_type_name() +
739  "\" is not allowed. Please use an appropriate column type."};
740  }
741  loadBuffersUsingLazyParquetImporter(column_id, fragment_id, required_buffers);
742  }
743  }
744 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
bool is_array() const
Definition: sqltypes.h:425
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::unique_ptr< ForeignTableSchema > schema_
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
specifies the content in-memory of a row in the column metadata table
std::shared_ptr< arrow::fs::FileSystem > file_system_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::string get_type_name() const
Definition: sqltypes.h:362
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
#define CHECK(condition)
Definition: Logger.h:197
void open_parquet_table(const std::string &file_path, std::unique_ptr< parquet::arrow::FileReader > &reader, std::shared_ptr< arrow::fs::FileSystem > &file_system)
SQLTypeInfo columnType
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, std::map< ChunkKey, AbstractBuffer *> &required_buffers)
void loadBuffersUsingLazyParquetImporter(const int logical_column_id, const int fragment_id, std::map< ChunkKey, AbstractBuffer *> &required_buffers)
+ Here is the call graph for this function:

◆ populateChunkMetadata()

void foreign_storage::ParquetDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 576 of file ParquetDataWrapper.cpp.

References chunk_metadata_map_, and fetchChunkMetadata().

577  {
579  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
580  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
581  }
582 }
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
+ Here is the call graph for this function:

◆ resetParquetMetadata()

void foreign_storage::ParquetDataWrapper::resetParquetMetadata ( )
private

Definition at line 96 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and total_row_count_.

Referenced by fetchChunkMetadata().

+ Here is the caller graph for this function:

◆ restoreDataWrapperInternals()

void foreign_storage::ParquetDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 782 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, fragment_to_row_group_interval_map_, foreign_storage::json_utils::get_value_from_object(), is_restored_, last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::json_utils::read_from_file(), and total_row_count_.

784  {
785  auto d = json_utils::read_from_file(file_path);
786  CHECK(d.IsObject());
787 
789  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
791  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
793  d, last_fragment_row_count_, "last_fragment_row_count");
795 
796  CHECK(chunk_metadata_map_.empty());
797  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
798  chunk_metadata_map_[chunk_key] = chunk_metadata;
799  }
800  is_restored_ = true;
801 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
rapidjson::Document read_from_file(const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ serializeDataWrapperInternals()

void foreign_storage::ParquetDataWrapper::serializeDataWrapperInternals ( const std::string &  file_path) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Parameters
file_path- location to save file to

Implements foreign_storage::ForeignDataWrapper.

Definition at line 762 of file ParquetDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, total_row_count_, and foreign_storage::json_utils::write_to_file().

763  {
764  rapidjson::Document d;
765  d.SetObject();
766 
769  "fragment_to_row_group_interval_map",
770  d.GetAllocator());
771  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
773  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
775  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
777  d, total_row_count_, "total_row_count", d.GetAllocator());
778 
779  json_utils::write_to_file(d, file_path);
780 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)
+ Here is the call graph for this function:

◆ updateStatsForEncoder()

void foreign_storage::ParquetDataWrapper::updateStatsForEncoder ( Encoder encoder,
const SQLTypeInfo  type_info,
const DataBlockPtr data_block,
const size_t  import_count 
)
private

Definition at line 367 of file ParquetDataWrapper.cpp.

References DataBlockPtr::arraysPtr, CHECK, SQLTypeInfo::get_type(), Encoder::getNumElems(), SQLTypeInfo::is_varlen(), kARRAY, kCHAR, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, kTEXT, kVARCHAR, DataBlockPtr::numbersPtr, Encoder::setNumElems(), DataBlockPtr::stringsPtr, UNREACHABLE, and Encoder::updateStats().

Referenced by loadMetadataChunk().

370  {
371  CHECK(encoder);
372  if (type_info.is_varlen()) {
373  switch (type_info.get_type()) {
374  case kARRAY: {
375  encoder->updateStats(data_block.arraysPtr, 0, import_count);
376  break;
377  }
378  case kTEXT:
379  case kVARCHAR:
380  case kCHAR:
381  case kPOINT:
382  case kLINESTRING:
383  case kPOLYGON:
384  case kMULTIPOLYGON: {
385  encoder->updateStats(data_block.stringsPtr, 0, import_count);
386  break;
387  }
388  default:
389  UNREACHABLE();
390  }
391  } else {
392  encoder->updateStats(data_block.numbersPtr, import_count);
393  }
394  encoder->setNumElems(encoder->getNumElems() + import_count);
395 }
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
#define UNREACHABLE()
Definition: Logger.h:241
bool is_varlen() const
Definition: sqltypes.h:432
void setNumElems(const size_t num_elems)
Definition: Encoder.h:215
Definition: sqltypes.h:54
Definition: sqltypes.h:43
size_t getNumElems() const
Definition: Encoder.h:214
#define CHECK(condition)
Definition: Logger.h:197
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
int8_t * numbersPtr
Definition: sqltypes.h:149
virtual void updateStats(const int64_t val, const bool is_null)=0
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ validateAndGetCopyParams()

import_export::CopyParams foreign_storage::ParquetDataWrapper::validateAndGetCopyParams ( ) const
private

Definition at line 334 of file ParquetDataWrapper.cpp.

References import_export::CopyParams::array_delim, and validateAndGetStringWithLength().

Referenced by fetchChunkMetadata(), and loadBuffersUsingLazyParquetImporter().

334  {
335  import_export::CopyParams copy_params{};
336  if (const auto& value = validateAndGetStringWithLength("ARRAY_DELIMITER", 1);
337  !value.empty()) {
338  copy_params.array_delim = value[0];
339  }
340  if (const auto& value = validateAndGetStringWithLength("ARRAY_MARKER", 2);
341  !value.empty()) {
342  copy_params.array_begin = value[0];
343  copy_params.array_end = value[1];
344  }
345  // The file_type argument is never utilized in the context of FSI,
346  // for completeness, set the file_type
347  copy_params.file_type = import_export::FileType::PARQUET;
348  return copy_params;
349 }
std::string validateAndGetStringWithLength(const std::string &option_name, const size_t expected_num_chars) const
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ validateAndGetStringWithLength()

std::string foreign_storage::ParquetDataWrapper::validateAndGetStringWithLength ( const std::string &  option_name,
const size_t  expected_num_chars 
) const
private

Validates that the value of given table option has the expected number of characters. An exception is thrown if the number of characters do not match.

Parameters
option_name- name of table option whose value is validated and returned
expected_num_chars- expected number of characters for option value
Returns
value of the option if the number of characters match. Returns an empty string if table options do not contain provided option.

Definition at line 351 of file ParquetDataWrapper.cpp.

References foreign_table_, foreign_storage::OptionsContainer::options, and to_string().

Referenced by validateAndGetCopyParams().

353  {
354  if (auto it = foreign_table_->options.find(option_name);
355  it != foreign_table_->options.end()) {
356  if (it->second.length() != expected_num_chars) {
357  throw std::runtime_error{"Value of \"" + option_name +
358  "\" foreign table option has the wrong number of "
359  "characters. Expected " +
360  std::to_string(expected_num_chars) + " character(s)."};
361  }
362  return it->second;
363  }
364  return "";
365 }
std::map< std::string, std::string, std::less<> > options
std::string to_string(char const *&&v)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ validateFilePath()

void foreign_storage::ParquetDataWrapper::validateFilePath ( ) const
private

Definition at line 87 of file ParquetDataWrapper.cpp.

References foreign_storage::ForeignTable::foreign_server, foreign_table_, getConfiguredFilePath(), ddl_utils::IMPORT, foreign_storage::ForeignServer::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::ForeignServer::STORAGE_TYPE_KEY, and ddl_utils::validate_allowed_file_path().

87  {
88  auto& server_options = foreign_table_->foreign_server->options;
89  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
93  }
94 }
std::map< std::string, std::string, std::less<> > options
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:611
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
const ForeignServer * foreign_server
Definition: ForeignTable.h:39
+ Here is the call graph for this function:

◆ validateOptions()

void foreign_storage::ParquetDataWrapper::validateOptions ( const ForeignTable foreign_table)
static

Definition at line 67 of file ParquetDataWrapper.cpp.

References foreign_storage::OptionsContainer::options, foreign_storage::ForeignTable::supported_options, and supported_options_.

Referenced by CreateForeignTableCommand::setTableDetails().

67  {
68  for (const auto& entry : foreign_table->options) {
69  const auto& table_options = foreign_table->supported_options;
70  if (std::find(table_options.begin(), table_options.end(), entry.first) ==
71  table_options.end() &&
72  std::find(supported_options_.begin(), supported_options_.end(), entry.first) ==
73  supported_options_.end()) {
74  throw std::runtime_error{"Invalid foreign table option \"" + entry.first + "\"."};
75  }
76  }
77  ParquetDataWrapper data_wrapper{foreign_table};
78  data_wrapper.validateAndGetCopyParams();
79  data_wrapper.validateFilePath();
80 }
ParquetDataWrapper(const int db_id, const ForeignTable *foreign_table)
static constexpr std::array< char const *, 4 > supported_options_
+ Here is the caller graph for this function:

Member Data Documentation

◆ chunk_metadata_map_

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::ParquetDataWrapper::chunk_metadata_map_
private

◆ db_id_

◆ file_system_

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetDataWrapper::file_system_
private

◆ foreign_table_

◆ fragment_to_row_group_interval_map_

◆ is_restored_

bool foreign_storage::ParquetDataWrapper::is_restored_
private

Definition at line 142 of file ParquetDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

◆ last_fragment_index_

int foreign_storage::ParquetDataWrapper::last_fragment_index_
private

◆ last_fragment_row_count_

size_t foreign_storage::ParquetDataWrapper::last_fragment_row_count_
private

◆ last_row_group_

int foreign_storage::ParquetDataWrapper::last_row_group_
private

◆ schema_

◆ supported_options_

constexpr std::array<char const*, 4> foreign_storage::ParquetDataWrapper::supported_options_
staticprivate
Initial value:
{"BASE_PATH",
"FILE_PATH",
"ARRAY_DELIMITER",
"ARRAY_MARKER"}

Definition at line 146 of file ParquetDataWrapper.h.

Referenced by getSupportedOptions(), and validateOptions().

◆ total_row_count_

size_t foreign_storage::ParquetDataWrapper::total_row_count_
private

The documentation for this class was generated from the following files: