OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParquetShared.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <arrow/api.h>
20 #include <arrow/filesystem/filesystem.h>
21 #include <arrow/io/api.h>
22 #include <parquet/arrow/reader.h>
23 #include <parquet/statistics.h>
24 #include <parquet/types.h>
25 
27 #include "DataMgr/ChunkMetadata.h"
29 
30 namespace foreign_storage {
31 
32 using UniqueReaderPtr = std::unique_ptr<parquet::arrow::FileReader>;
33 using ReaderPtr = parquet::arrow::FileReader*;
34 
35 /*
36  Splits up a set of items to be processed into multiple partitions, with the intention
37  that each thread will process a separate part.
38  */
39 // TODO(Misiu): Change this to return a list of Views/Ranges when we support c++20.
40 template <typename T>
41 auto partition_for_threads(const std::set<T>& items, size_t max_threads) {
42  const size_t items_per_thread = (items.size() + (max_threads - 1)) / max_threads;
43  std::list<std::set<T>> items_by_thread;
44  auto i = 0U;
45  for (auto item : items) {
46  if (i++ % items_per_thread == 0) {
47  items_by_thread.emplace_back(std::set<T>{});
48  }
49  items_by_thread.back().emplace(item);
50  }
51  return items_by_thread;
52 }
53 
55  std::string file_path;
56  int start_index{-1}, end_index{-1};
57 };
58 
60  std::string file_path;
62  std::list<std::shared_ptr<ChunkMetadata>> column_chunk_metadata;
63 };
64 
65 UniqueReaderPtr open_parquet_table(const std::string& file_path,
66  std::shared_ptr<arrow::fs::FileSystem>& file_system);
67 
68 std::pair<int, int> get_parquet_table_size(const ReaderPtr& reader);
69 
70 const parquet::ColumnDescriptor* get_column_descriptor(
71  const parquet::arrow::FileReader* reader,
72  const int logical_column_index);
73 
75  const parquet::ColumnDescriptor* reference_descriptor,
76  const parquet::ColumnDescriptor* new_descriptor,
77  const std::string& reference_file_path,
78  const std::string& new_file_path);
79 
80 std::unique_ptr<ColumnDescriptor> get_sub_type_column_descriptor(
81  const ColumnDescriptor* column);
82 
83 std::shared_ptr<parquet::Statistics> validate_and_get_column_metadata_statistics(
84  const parquet::ColumnChunkMetaData* column_metadata);
85 
86 // A cache for parquet FileReaders which locks access for parallel use.
88  public:
89  const ReaderPtr getOrInsert(const std::string& path,
90  std::shared_ptr<arrow::fs::FileSystem>& file_system) {
91  mapd_unique_lock<std::mutex> cache_lock(mutex_);
92  if (map_.count(path) < 1 || !(map_.at(path))) {
93  map_[path] = open_parquet_table(path, file_system);
94  }
95  return map_.at(path).get();
96  }
97 
98  const ReaderPtr insert(const std::string& path,
99  std::shared_ptr<arrow::fs::FileSystem>& file_system) {
100  mapd_unique_lock<std::mutex> cache_lock(mutex_);
101  map_[path] = open_parquet_table(path, file_system);
102  return map_.at(path).get();
103  }
104 
105  void initializeIfEmpty(const std::string& path) {
106  mapd_unique_lock<std::mutex> cache_lock(mutex_);
107  if (map_.count(path) < 1) {
108  map_.emplace(path, UniqueReaderPtr());
109  }
110  }
111 
112  void clear() {
113  mapd_unique_lock<std::mutex> cache_lock(mutex_);
114  map_.clear();
115  }
116 
117  private:
118  mutable std::mutex mutex_;
119  std::map<const std::string, UniqueReaderPtr> map_;
120 };
121 } // namespace foreign_storage
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: ParquetShared.h:41
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
std::list< std::shared_ptr< ChunkMetadata > > column_chunk_metadata
Definition: ParquetShared.h:62
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:89
specifies the content in-memory of a row in the column metadata table
parquet::arrow::FileReader * ReaderPtr
Definition: ParquetShared.h:33
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:98
void initializeIfEmpty(const std::string &path)
std::map< const std::string, UniqueReaderPtr > map_
std::unique_ptr< parquet::arrow::FileReader > UniqueReaderPtr
Definition: ParquetShared.h:32