OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetDataWrapper.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <map>
20 #include <unordered_set>
21 #include <vector>
22 
24 #include "DataMgr/Chunk/Chunk.h"
25 #include "DataPreview.h"
26 #include "ForeignDataWrapper.h"
27 #include "ForeignTableSchema.h"
28 #include "ImportExport/Importer.h"
29 #include "Interval.h"
30 #include "LazyParquetChunkLoader.h"
31 
32 namespace arrow {
33 namespace fs {
34 class FileSystem;
35 }
36 } // namespace arrow
37 
38 namespace foreign_storage {
39 
40 using FilePathAndRowGroup = std::pair<std::string, int32_t>;
41 
43  public:
45 
46  ParquetDataWrapper(const int db_id,
47  const ForeignTable* foreign_table,
48  const bool do_metadata_stats_validation = true);
49 
53  ParquetDataWrapper(const ForeignTable* foreign_table,
54  std::shared_ptr<arrow::fs::FileSystem> file_system);
55 
56  void populateChunkMetadata(ChunkMetadataVector& chunk_metadata_vector) override;
57 
58  void populateChunkBuffers(const ChunkToBufferMap& required_buffers,
59  const ChunkToBufferMap& optional_buffers,
60  AbstractBuffer* delete_buffer) override;
61 
62  std::string getSerializedDataWrapper() const override;
63 
65  const std::string& file_path,
66  const ChunkMetadataVector& chunk_metadata_vector) override;
67 
68  bool isRestored() const override;
69 
71 
73  return INTRA_FRAGMENT;
74  }
75 
76  void createRenderGroupAnalyzers() override;
77 
78  DataPreview getDataPreview(const size_t num_rows);
79 
80  private:
81  std::list<const ColumnDescriptor*> getColumnsToInitialize(
82  const Interval<ColumnType>& column_interval);
83  void initializeChunkBuffers(const int fragment_index,
84  const Interval<ColumnType>& column_interval,
85  const ChunkToBufferMap& required_buffers,
86  const bool reserve_buffers_and_set_stats = false);
87  void fetchChunkMetadata();
88  void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id,
89  const int fragment_id,
90  const ChunkToBufferMap& required_buffers,
91  AbstractBuffer* delete_buffer);
92 
93  std::vector<std::string> getOrderedProcessedFilePaths();
94  std::vector<std::string> getAllFilePaths();
95 
96  bool moveToNextFragment(size_t new_rows_count) const;
97 
98  void finalizeFragmentMap();
99  void addNewFragment(int row_group, const std::string& file_path);
100 
101  bool isNewFile(const std::string& file_path) const;
102 
103  void addNewFile(const std::string& file_path);
104 
105  void setLastFileRowCount(const std::string& file_path);
106 
107  void resetParquetMetadata();
108 
109  void metadataScanFiles(const std::vector<std::string>& file_paths);
110 
112  const std::list<RowGroupMetadata>& row_group_metadata);
113 
114  std::list<RowGroupMetadata> getRowGroupMetadataForFilePaths(
115  const std::vector<std::string>& file_paths) const;
116 
117  std::map<FilePathAndRowGroup, RowGroupMetadata> getRowGroupMetadataMap(
118  const std::vector<std::string>& file_paths) const;
119 
121  const Interval<ColumnType>& column_interval,
122  const std::list<std::shared_ptr<ChunkMetadata>>& column_chunk_metadata,
123  int32_t fragment_id);
124 
126  const std::vector<RowGroupInterval>& row_group_intervals);
127 
128  void updateMetadataForRolledOffFiles(const std::set<std::string>& rolled_off_files);
129 
130  void removeMetadataForLastFile(const std::string& last_file_path);
131 
133  std::map<int, std::vector<RowGroupInterval>> fragment_to_row_group_interval_map_;
134  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>> chunk_metadata_map_;
135  const int db_id_;
143  std::unique_ptr<ForeignTableSchema> schema_;
144  std::shared_ptr<arrow::fs::FileSystem> file_system_;
145  std::unique_ptr<FileReaderMap> file_reader_cache_;
146 
148 
149  // declared in three derived classes to avoid
150  // polluting ForeignDataWrapper virtual base
151  // @TODO refactor to lower class if needed
153 };
154 } // namespace foreign_storage
std::string getSerializedDataWrapper() const override
std::unique_ptr< FileReaderMap > file_reader_cache_
void updateMetadataForRolledOffFiles(const std::set< std::string > &rolled_off_files)
std::vector< std::string > getOrderedProcessedFilePaths()
void setLastFileRowCount(const std::string &file_path)
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
std::vector< std::string > getAllFilePaths()
std::unique_ptr< ForeignTableSchema > schema_
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void metadataScanRowGroupIntervals(const std::vector< RowGroupInterval > &row_group_intervals)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
ParallelismLevel getCachedParallelismLevel() const override
void addNewFile(const std::string &file_path)
DataPreview getDataPreview(const size_t num_rows)
RenderGroupAnalyzerMap render_group_analyzer_map_
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::shared_ptr< arrow::fs::FileSystem > file_system_
void removeMetadataForLastFile(const std::string &last_file_path)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
ParallelismLevel getNonCachedParallelismLevel() const override
void metadataScanFiles(const std::vector< std::string > &file_paths)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const
std::pair< std::string, int32_t > FilePathAndRowGroup
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const