OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetDataWrapper.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <map>
20 #include <unordered_set>
21 #include <vector>
22 
24 #include "DataMgr/Chunk/Chunk.h"
25 #include "DataPreview.h"
26 #include "ForeignDataWrapper.h"
27 #include "ForeignTableSchema.h"
28 #include "ImportExport/Importer.h"
29 #include "Interval.h"
30 #include "LazyParquetChunkLoader.h"
31 
32 namespace arrow {
33 namespace fs {
34 class FileSystem;
35 }
36 } // namespace arrow
37 
38 namespace foreign_storage {
39 
41  public:
43 
44  ParquetDataWrapper(const int db_id,
45  const ForeignTable* foreign_table,
46  const bool do_metadata_stats_validation = true);
47 
51  ParquetDataWrapper(const ForeignTable* foreign_table,
52  std::shared_ptr<arrow::fs::FileSystem> file_system);
53 
54  void populateChunkMetadata(ChunkMetadataVector& chunk_metadata_vector) override;
55 
56  void populateChunkBuffers(const ChunkToBufferMap& required_buffers,
57  const ChunkToBufferMap& optional_buffers,
58  AbstractBuffer* delete_buffer) override;
59 
60  std::string getSerializedDataWrapper() const override;
61 
63  const std::string& file_path,
64  const ChunkMetadataVector& chunk_metadata_vector) override;
65 
66  bool isRestored() const override;
67 
69 
71  return INTRA_FRAGMENT;
72  }
73 
74  void createRenderGroupAnalyzers() override;
75 
76  DataPreview getDataPreview(const size_t num_rows);
77 
78  private:
79  std::list<const ColumnDescriptor*> getColumnsToInitialize(
80  const Interval<ColumnType>& column_interval);
81  void initializeChunkBuffers(const int fragment_index,
82  const Interval<ColumnType>& column_interval,
83  const ChunkToBufferMap& required_buffers,
84  const bool reserve_buffers_and_set_stats = false);
85  void fetchChunkMetadata();
86  void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id,
87  const int fragment_id,
88  const ChunkToBufferMap& required_buffers,
89  AbstractBuffer* delete_buffer);
90 
91  std::set<std::string> getProcessedFilePaths();
92  std::vector<std::string> getAllFilePaths();
93 
94  bool moveToNextFragment(size_t new_rows_count) const;
95 
96  void finalizeFragmentMap();
97  void addNewFragment(int row_group, const std::string& file_path);
98 
99  bool isNewFile(const std::string& file_path) const;
100 
101  void addNewFile(const std::string& file_path);
102 
103  void resetParquetMetadata();
104 
105  void metadataScanFiles(const std::vector<std::string>& file_paths);
106 
108  std::map<int, std::vector<RowGroupInterval>> fragment_to_row_group_interval_map_;
109  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>> chunk_metadata_map_;
110  const int db_id_;
117  std::unique_ptr<ForeignTableSchema> schema_;
118  std::shared_ptr<arrow::fs::FileSystem> file_system_;
119  std::unique_ptr<FileReaderMap> file_reader_cache_;
120 
122 
123  // declared in three derived classes to avoid
124  // polluting ForeignDataWrapper virtual base
125  // @TODO refactor to lower class if needed
127 };
128 } // namespace foreign_storage
std::string getSerializedDataWrapper() const override
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
std::vector< std::string > getAllFilePaths()
std::unique_ptr< ForeignTableSchema > schema_
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
ParallelismLevel getCachedParallelismLevel() const override
void addNewFile(const std::string &file_path)
DataPreview getDataPreview(const size_t num_rows)
RenderGroupAnalyzerMap render_group_analyzer_map_
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
ParallelismLevel getNonCachedParallelismLevel() const override
void metadataScanFiles(const std::vector< std::string > &file_paths)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.