OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetDataWrapper.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <map>
20 #include <unordered_set>
21 #include <vector>
22 
24 #include "DataMgr/Chunk/Chunk.h"
25 #include "DataPreview.h"
26 #include "ForeignDataWrapper.h"
27 #include "ForeignTableSchema.h"
28 #include "ImportExport/Importer.h"
29 #include "Interval.h"
30 #include "LazyParquetChunkLoader.h"
31 
32 namespace arrow {
33 namespace fs {
34 class FileSystem;
35 }
36 } // namespace arrow
37 
38 namespace foreign_storage {
39 
40 using FilePathAndRowGroup = std::pair<std::string, int32_t>;
41 
43  public:
45 
46  ParquetDataWrapper(const int db_id,
47  const ForeignTable* foreign_table,
48  const bool do_metadata_stats_validation = true);
49 
53  ParquetDataWrapper(const ForeignTable* foreign_table,
54  std::shared_ptr<arrow::fs::FileSystem> file_system);
55 
56  void populateChunkMetadata(ChunkMetadataVector& chunk_metadata_vector) override;
57 
58  void populateChunkBuffers(const ChunkToBufferMap& required_buffers,
59  const ChunkToBufferMap& optional_buffers,
60  AbstractBuffer* delete_buffer) override;
61 
62  std::string getSerializedDataWrapper() const override;
63 
65  const std::string& file_path,
66  const ChunkMetadataVector& chunk_metadata_vector) override;
67 
68  bool isRestored() const override;
69 
71 
73  return INTRA_FRAGMENT;
74  }
75 
76  DataPreview getDataPreview(const size_t num_rows);
77 
78  private:
79  std::list<const ColumnDescriptor*> getColumnsToInitialize(
80  const Interval<ColumnType>& column_interval);
81  void initializeChunkBuffers(const int fragment_index,
82  const Interval<ColumnType>& column_interval,
83  const ChunkToBufferMap& required_buffers,
84  const bool reserve_buffers_and_set_stats = false);
85  void fetchChunkMetadata();
86  void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id,
87  const int fragment_id,
88  const ChunkToBufferMap& required_buffers,
89  AbstractBuffer* delete_buffer);
90 
91  std::vector<std::string> getOrderedProcessedFilePaths();
92  std::vector<std::string> getAllFilePaths();
93 
94  bool moveToNextFragment(size_t new_rows_count) const;
95 
96  void finalizeFragmentMap();
97  void addNewFragment(int row_group, const std::string& file_path);
98 
99  bool isNewFile(const std::string& file_path) const;
100 
101  void addNewFile(const std::string& file_path);
102 
103  void setLastFileRowCount(const std::string& file_path);
104 
105  void resetParquetMetadata();
106 
107  void metadataScanFiles(const std::vector<std::string>& file_paths);
108 
110  const std::list<RowGroupMetadata>& row_group_metadata);
111 
112  std::list<RowGroupMetadata> getRowGroupMetadataForFilePaths(
113  const std::vector<std::string>& file_paths) const;
114 
115  std::map<FilePathAndRowGroup, RowGroupMetadata> getRowGroupMetadataMap(
116  const std::vector<std::string>& file_paths) const;
117 
119  const Interval<ColumnType>& column_interval,
120  const std::list<std::shared_ptr<ChunkMetadata>>& column_chunk_metadata,
121  int32_t fragment_id);
122 
124  const std::vector<RowGroupInterval>& row_group_intervals);
125 
126  void updateMetadataForRolledOffFiles(const std::set<std::string>& rolled_off_files);
127 
128  void removeMetadataForLastFile(const std::string& last_file_path);
129 
131  std::map<int, std::vector<RowGroupInterval>> fragment_to_row_group_interval_map_;
132  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>> chunk_metadata_map_;
133  const int db_id_;
141  std::unique_ptr<ForeignTableSchema> schema_;
142  std::shared_ptr<arrow::fs::FileSystem> file_system_;
143  std::unique_ptr<FileReaderMap> file_reader_cache_;
144 
146 };
147 } // namespace foreign_storage
std::string getSerializedDataWrapper() const override
std::unique_ptr< FileReaderMap > file_reader_cache_
void updateMetadataForRolledOffFiles(const std::set< std::string > &rolled_off_files)
std::vector< std::string > getOrderedProcessedFilePaths()
void setLastFileRowCount(const std::string &file_path)
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
std::vector< std::string > getAllFilePaths()
std::unique_ptr< ForeignTableSchema > schema_
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void metadataScanRowGroupIntervals(const std::vector< RowGroupInterval > &row_group_intervals)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
ParallelismLevel getCachedParallelismLevel() const override
void addNewFile(const std::string &file_path)
DataPreview getDataPreview(const size_t num_rows)
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::shared_ptr< arrow::fs::FileSystem > file_system_
void removeMetadataForLastFile(const std::string &last_file_path)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
ParallelismLevel getNonCachedParallelismLevel() const override
void metadataScanFiles(const std::vector< std::string > &file_paths)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const
std::pair< std::string, int32_t > FilePathAndRowGroup
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const