OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParquetDataWrapper.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <map>
20 #include <unordered_set>
21 #include <vector>
22 
24 #include "Catalog/Catalog.h"
25 #include "Catalog/ForeignTable.h"
26 #include "DataMgr/Chunk/Chunk.h"
27 #include "ForeignDataWrapper.h"
28 #include "ForeignTableSchema.h"
29 #include "ImportExport/Importer.h"
30 #include "Interval.h"
31 #include "LazyParquetChunkLoader.h"
32 
33 namespace foreign_storage {
34 
36  public:
38 
39  ParquetDataWrapper(const int db_id, const ForeignTable* foreign_table);
40 
41  void populateChunkMetadata(ChunkMetadataVector& chunk_metadata_vector) override;
42 
43  void populateChunkBuffers(const ChunkToBufferMap& required_buffers,
44  const ChunkToBufferMap& optional_buffers) override;
45 
46  void serializeDataWrapperInternals(const std::string& file_path) const override;
47 
49  const std::string& file_path,
50  const ChunkMetadataVector& chunk_metadata_vector) override;
51 
52  bool isRestored() const override;
53 
55 
57  return INTRA_FRAGMENT;
58  }
59 
60  private:
61  std::list<const ColumnDescriptor*> getColumnsToInitialize(
62  const Interval<ColumnType>& column_interval);
63  void initializeChunkBuffers(const int fragment_index,
64  const Interval<ColumnType>& column_interval,
65  const ChunkToBufferMap& required_buffers,
66  const bool reserve_buffers_and_set_stats = false);
67  void fetchChunkMetadata();
68  void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id,
69  const int fragment_id,
70  const ChunkToBufferMap& required_buffers);
71 
72  std::set<std::string> getProcessedFilePaths();
73  std::set<std::string> getAllFilePaths();
74 
75  bool moveToNextFragment(size_t new_rows_count) const;
76 
77  void finalizeFragmentMap();
78  void addNewFragment(int row_group, const std::string& file_path);
79 
80  bool isNewFile(const std::string& file_path) const;
81 
82  void addNewFile(const std::string& file_path);
83 
84  void resetParquetMetadata();
85 
86  void metadataScanFiles(const std::set<std::string>& file_paths);
87 
88  std::map<int, std::vector<RowGroupInterval>> fragment_to_row_group_interval_map_;
89  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>> chunk_metadata_map_;
90  const int db_id_;
97  std::unique_ptr<ForeignTableSchema> schema_;
98  std::shared_ptr<arrow::fs::FileSystem> file_system_;
99  std::unique_ptr<FileReaderMap> file_reader_cache_;
100 };
101 } // namespace foreign_storage
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
std::set< std::string > getAllFilePaths()
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
std::unique_ptr< ForeignTableSchema > schema_
void serializeDataWrapperInternals(const std::string &file_path) const override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
This file contains the class specification and related data structures for Catalog.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
ParallelismLevel getCachedParallelismLevel() const override
void addNewFile(const std::string &file_path)
void metadataScanFiles(const std::set< std::string > &file_paths)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
ParallelismLevel getNonCachedParallelismLevel() const override
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers)
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override