OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AbstractTextFileDataWrapper.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <map>
20 #include <queue>
21 #include <set>
22 
24 #include "Catalog/CatalogFwd.h"
25 #include "Catalog/ForeignTable.h"
26 #include "DataMgr/Chunk/Chunk.h"
30 
31 namespace foreign_storage {
32 
39  std::queue<ParseBufferRequest> pending_requests;
40  std::queue<ParseBufferRequest>
41  deferred_requests; // holds requests that will be processed in the next iteration
42  // during an iterative file scan
45  std::condition_variable pending_requests_condition;
46  std::queue<ParseBufferRequest> request_pool;
47  std::mutex request_pool_mutex;
48  std::condition_variable request_pool_condition;
50  std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer>> chunk_encoder_buffers;
51  std::map<ChunkKey, Chunk_NS::Chunk> cached_chunks;
54 };
55 
57  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map;
58  int32_t fragment_id;
60 
61  mutable std::map<int, std::unique_ptr<std::mutex>> column_id_to_chunk_mutex;
62  mutable std::map<int, std::unique_ptr<std::condition_variable>>
64  mutable std::mutex delete_buffer_mutex;
65 
66  IterativeFileScanParameters(std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
67  int32_t fragment_id,
69  : column_id_to_chunk_map(column_id_to_chunk_map)
70  , fragment_id(fragment_id)
71  , delete_buffer(delete_buffer) {
72  for (const auto& [key, _] : column_id_to_chunk_map) {
73  column_id_to_chunk_mutex[key] = std::make_unique<std::mutex>();
75  std::make_unique<std::condition_variable>();
76  }
77  }
78 
79  std::mutex& getChunkMutex(const int col_id) const {
80  auto mutex_it = column_id_to_chunk_mutex.find(col_id);
81  CHECK(mutex_it != column_id_to_chunk_mutex.end());
82  return *mutex_it->second;
83  }
84 
85  std::condition_variable& getChunkConditionalVariable(const int col_id) const {
86  auto var_it = column_id_to_chunk_conditional_var.find(col_id);
88  return *var_it->second;
89  }
90 };
91 
93  public:
95 
96  AbstractTextFileDataWrapper(const int db_id, const ForeignTable* foreign_table);
97 
98  AbstractTextFileDataWrapper(const int db_id,
99  const ForeignTable* foreign_table,
100  const UserMapping* user_mapping,
101  const bool disable_cache);
102 
103  void populateChunkMetadata(ChunkMetadataVector& chunk_metadata_vector) override;
104 
105  void populateChunkBuffers(const ChunkToBufferMap& required_buffers,
106  const ChunkToBufferMap& optional_buffers,
107  AbstractBuffer* delete_buffer) override;
108 
109  std::string getSerializedDataWrapper() const override;
110 
111  void restoreDataWrapperInternals(const std::string& file_path,
112  const ChunkMetadataVector& chunk_metadata) override;
113  bool isRestored() const override;
114 
116 
118  return INTRA_FRAGMENT;
119  }
120 
121  bool isLazyFragmentFetchingEnabled() const override { return true; }
122 
123  struct ResidualBuffer {
124  std::unique_ptr<char[]> residual_data;
125  size_t alloc_size;
128  };
129 
130  protected:
131  virtual const TextFileBufferParser& getFileBufferParser() const = 0;
132  virtual std::optional<size_t> getMaxFileCount() const;
133 
134  private:
135  AbstractTextFileDataWrapper(const ForeignTable* foreign_table);
136 
141  void iterativeFileScan(ChunkMetadataVector& chunk_metadata_vector,
142  IterativeFileScanParameters& file_scan_param);
143 
152  void populateChunks(std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
153  int fragment_id,
154  AbstractBuffer* delete_buffer);
155 
156  void populateChunkMapForColumns(const std::set<const ColumnDescriptor*>& columns,
157  const int fragment_id,
158  const ChunkToBufferMap& buffers,
159  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map);
160 
161  void updateMetadata(std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
162  int fragment_id);
163 
165  const std::set<std::string>& rolled_off_files,
166  const std::map<int32_t, const ColumnDescriptor*>& column_by_id);
167 
168  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>> chunk_metadata_map_;
169  std::map<int, FileRegions> fragment_id_to_file_regions_map_;
170 
171  std::unique_ptr<FileReader> file_reader_;
172 
173  const int db_id_;
175 
176  // Data needed for append workflow
177  std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer>> chunk_encoder_buffers_;
178  // How many rows have been read
179  size_t num_rows_;
180  // What byte offset we left off at in the file_reader
182  // Is this datawrapper restored from disk
184 
186 
187  // Force cache to be disabled
188  const bool disable_cache_;
189 
192 
193  // Track the fragment for which the last request currently processed belongs to
195 
196  // These parameters may be reused in a iterative file scan
198  size_t buffer_size_;
200 
202 };
203 } // namespace foreign_storage
std::condition_variable & getChunkConditionalVariable(const int col_id) const
void updateRolledOffChunks(const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
std::map< int, std::unique_ptr< std::mutex > > column_id_to_chunk_mutex
virtual const TextFileBufferParser & getFileBufferParser() const =0
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
std::map< int, Chunk_NS::Chunk > & column_id_to_chunk_map
void iterativeFileScan(ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
MetadataScanMultiThreadingParams multi_threading_params_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
virtual std::optional< size_t > getMaxFileCount() const
ParallelismLevel getNonCachedParallelismLevel() const override
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
std::mutex & getChunkMutex(const int col_id) const
std::map< int, std::unique_ptr< std::condition_variable > > column_id_to_chunk_conditional_var
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
ParallelismLevel getCachedParallelismLevel() const override
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
#define CHECK(condition)
Definition: Logger.h:291
IterativeFileScanParameters(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int32_t fragment_id, AbstractBuffer *delete_buffer)
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)