OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileReader.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <optional>
20 
21 #include <boost/filesystem.hpp>
22 #include "rapidjson/document.h"
23 
25 #include "Catalog/ForeignTable.h"
27 
28 namespace foreign_storage {
29 
30 struct ForeignServer;
31 struct UserMapping;
32 
33 using FirstLineByFilePath = std::map<std::string, std::string>;
34 
35 // File reader
36 // Supports an initial full scan with calls to read()
37 // When the entire object has been read isScanFinished() returns true
38 // Previously read data can then be re-read with readRegion()
39 class FileReader {
40  public:
41  FileReader(const std::string& file_path, const import_export::CopyParams& copy_params)
42  : copy_params_(copy_params), file_path_(file_path){};
43  virtual ~FileReader() = default;
44 
53  virtual size_t read(void* buffer, size_t max_size) = 0;
54 
58  virtual bool isScanFinished() = 0;
59 
69  virtual size_t readRegion(void* buffer, size_t offset, size_t size) = 0;
70 
74  virtual size_t getRemainingSize() = 0;
75 
79  virtual bool isRemainingSizeKnown() = 0;
89  virtual void checkForMoreRows(size_t file_offset,
90  const ForeignServer* server_options = nullptr,
91  const UserMapping* user_mapping = nullptr) {
92  throw std::runtime_error{"APPEND mode not yet supported for this table."};
93  }
94 
103  virtual void serialize(rapidjson::Value& value,
104  rapidjson::Document::AllocatorType& allocator) const = 0;
105 
109  virtual FirstLineByFilePath getFirstLineForEachFile() const = 0;
110 
115  virtual bool isEndOfLastFile() = 0;
116 
117  protected:
119  std::string file_path_;
120 };
121 
122 class SingleFileReader : public FileReader {
123  public:
124  SingleFileReader(const std::string& file_path,
125  const import_export::CopyParams& copy_params);
126 
127  ~SingleFileReader() override = default;
128 
130 
131  bool isEndOfLastFile() override;
132 
133  protected:
134  virtual std::string getFirstLine() const = 0;
135  virtual void skipHeader() = 0;
136 
137  static constexpr size_t DEFAULT_HEADER_READ_SIZE{1024};
138 };
139 
140 // Single uncompressed file, that supports FSEEK for faster random access
142  public:
143  SingleTextFileReader(const std::string& file_path,
144  const import_export::CopyParams& copy_params);
145  SingleTextFileReader(const std::string& file_path,
146  const import_export::CopyParams& copy_params,
147  const rapidjson::Value& value);
148  ~SingleTextFileReader() override { fclose(file_); }
149 
150  // Delete copy assignment to prevent copying resource pointer
153 
154  size_t read(void* buffer, size_t max_size) override {
155  size_t bytes_read = fread(buffer, 1, max_size, file_);
156  if (!scan_finished_) {
157  scan_finished_ = feof(file_);
158  }
159 
160  total_bytes_read_ += bytes_read;
161  return bytes_read;
162  }
163 
164  size_t readRegion(void* buffer, size_t offset, size_t size) override {
166  if (fseek(file_, static_cast<long int>(offset + header_offset_), SEEK_SET) != 0) {
167  throw std::runtime_error{"An error occurred when attempting to read offset " +
168  std::to_string(offset) + " in file: \"" + file_path_ +
169  "\". " + strerror(errno)};
170  }
171  return fread(buffer, 1, size, file_);
172  }
173 
174  bool isScanFinished() override { return scan_finished_; }
175 
176  size_t getRemainingSize() override { return data_size_ - total_bytes_read_; }
177 
178  bool isRemainingSizeKnown() override { return true; };
179  void checkForMoreRows(size_t file_offset,
180  const ForeignServer* server_options,
181  const UserMapping* user_mapping) override;
182 
183  void serialize(rapidjson::Value& value,
184  rapidjson::Document::AllocatorType& allocator) const override;
185 
186  private:
187  std::string getFirstLine() const override;
188  void skipHeader() override;
189 
190  std::FILE* file_;
191  // Size of data in file
192  size_t data_size_;
193  // We've reached the end of the file
195 
196  // Size of the header in bytes
198 
200 };
201 
203  public:
204  ArchiveWrapper(const std::string& file_path)
205  : current_block_(nullptr)
207  , current_entry_(-1)
208  , file_path_(file_path) {
209  resetArchive();
210  }
211 
215  void skipToEntry(int entry_number);
216 
217  // Go to next consecutive entry in archive
218  bool nextEntry();
219 
220  bool currentEntryFinished() const { return (block_chars_remaining_ == 0); }
221 
223 
224  // Consume given amount of data from current block, copying into dest_buffer if set
225  void consumeDataFromCurrentEntry(size_t size, char* dest_buffer = nullptr);
226 
227  // Return the next char from the buffer without consuming it
228  char peekNextChar();
229 
230  int getCurrentEntryIndex() const { return current_entry_; }
231 
232  // Reset archive, start reading again from the first entry
233  void resetArchive();
234 
235  std::string entryName() { return arch_->entryName(); }
236 
237  private:
241  void fetchBlock();
242 
243  std::unique_ptr<Archive> arch_;
244  // Pointer to current uncompressed block from the archive
245  const void* current_block_;
246  // Number of chars remaining in the current block
248  // Index of entry of the archive file
250 
251  std::string file_path_;
252 };
253 
254 // Single archive, does not support random access
256  public:
257  CompressedFileReader(const std::string& file_path,
258  const import_export::CopyParams& copy_params);
259  CompressedFileReader(const std::string& file_path,
260  const import_export::CopyParams& copy_params,
261  const rapidjson::Value& value);
262  size_t read(void* buffer, size_t max_size) override;
263 
264  size_t readRegion(void* buffer, size_t offset, size_t size) override;
265 
266  bool isScanFinished() override { return scan_finished_; }
267 
268  bool isRemainingSizeKnown() override { return false; };
269  size_t getRemainingSize() override { return 0; }
270 
271  void serialize(rapidjson::Value& value,
272  rapidjson::Document::AllocatorType& allocator) const override;
273 
274  private:
278  void resetArchive();
279 
280  void checkForMoreRows(size_t file_offset,
281  const ForeignServer* server_options,
282  const UserMapping* user_mapping) override;
283 
287  void nextEntry();
288 
292  void skipHeader() override;
293 
298  void skipBytes(size_t n_bytes);
299 
300  // Read bytes in current entry adjusting for EOF
301  size_t readInternal(void* buffer, size_t read_size, size_t buffer_size);
302 
303  std::string getFirstLine() const override;
304 
305  void consumeFirstLine(std::optional<std::string>& dest_str);
306 
308 
309  // Are we doing initial scan or an append
311  // We've reached the end of the last file
313 
314  // Overall number of bytes read in the archive (minus headers)
316 
317  // Index of current entry in order they appear in
318  // cumulative_sizes_/sourcenames_/archive_entry_index_
320 
321  // Size of each file + all previous files
322  std::vector<size_t> cumulative_sizes_;
323  // Names of the file in the archive
324  std::vector<std::string> sourcenames_;
325  // Index of the entry in the archive
326  // Order can change during append operation
327  std::vector<int> archive_entry_index_;
328 };
329 
330 // Combines several archives into single object
331 class MultiFileReader : public FileReader {
332  public:
333  MultiFileReader(const std::string& file_path,
334  const import_export::CopyParams& copy_params);
335  MultiFileReader(const std::string& file_path,
336  const import_export::CopyParams& copy_params,
337  const rapidjson::Value& value);
338 
339  size_t getRemainingSize() override;
340 
341  bool isRemainingSizeKnown() override;
342 
343  size_t read(void* buffer, size_t max_size) override;
344 
345  size_t readRegion(void* buffer, size_t offset, size_t size) override;
346 
347  bool isScanFinished() override { return (current_index_ >= files_.size()); }
348 
349  void serialize(rapidjson::Value& value,
350  rapidjson::Document::AllocatorType& allocator) const override;
351 
353 
354  bool isEndOfLastFile() override;
355 
356  protected:
357  std::vector<std::unique_ptr<FileReader>> files_;
358  std::vector<std::string> file_locations_;
359 
360  // Size of each file + all previous files
361  std::vector<size_t> cumulative_sizes_;
362  // Current file being read
364  // Overall number of bytes read in the directory (minus headers)
366 
368 };
369 
370 // Single file or directory with multiple files
372  public:
373  LocalMultiFileReader(const std::string& file_path,
374  const import_export::CopyParams& copy_params,
375  const std::optional<std::string>& regex_path_filter,
376  const std::optional<std::string>& file_sort_order_by,
377  const std::optional<std::string>& file_sort_regex);
378 
379  LocalMultiFileReader(const std::string& file_path,
380  const import_export::CopyParams& copy_params,
381  const rapidjson::Value& value);
382 
383  void checkForMoreRows(size_t file_offset,
384  const ForeignServer* server_options,
385  const UserMapping* user_mapping) override;
386 
387  private:
388  void insertFile(std::string location);
389 };
390 
391 } // namespace foreign_storage
std::vector< std::string > sourcenames_
Definition: FileReader.h:324
virtual std::string getFirstLine() const =0
virtual size_t read(void *buffer, size_t max_size)=0
std::vector< int > archive_entry_index_
Definition: FileReader.h:327
std::string getFirstLine() const override
Definition: FileReader.cpp:170
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:322
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:239
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:76
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.h:164
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:297
SingleTextFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:84
FileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.h:41
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: FileReader.cpp:265
std::map< std::string, std::string > FirstLineByFilePath
Definition: FileReader.h:33
virtual bool isEndOfLastFile()=0
virtual size_t getRemainingSize()=0
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:519
virtual ~FileReader()=default
std::string to_string(char const *&&v)
void insertFile(std::string location)
Definition: FileReader.cpp:632
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params, const std::optional< std::string > &regex_path_filter, const std::optional< std::string > &file_sort_order_by, const std::optional< std::string > &file_sort_regex)
Definition: FileReader.cpp:590
import_export::CopyParams copy_params_
Definition: FileReader.h:118
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:125
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:505
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: FileReader.cpp:206
bool isRemainingSizeKnown() override
Definition: FileReader.cpp:570
SingleTextFileReader & operator=(const SingleTextFileReader &)=delete
FirstLineByFilePath getFirstLineForEachFile() const override
Definition: FileReader.cpp:578
ArchiveWrapper(const std::string &file_path)
Definition: FileReader.h:204
bool currentEntryFinished() const
Definition: FileReader.h:220
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: FileReader.cpp:710
void consumeFirstLine(std::optional< std::string > &dest_str)
Definition: FileReader.cpp:374
virtual bool isScanFinished()=0
virtual void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const =0
std::unique_ptr< Archive > arch_
Definition: FileReader.h:243
static constexpr size_t DEFAULT_HEADER_READ_SIZE
Definition: FileReader.h:137
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:72
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:690
void skipToEntry(int entry_number)
Definition: FileReader.cpp:182
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:412
#define CHECK(condition)
Definition: Logger.h:209
size_t currentEntryDataAvailable() const
Definition: FileReader.h:222
virtual bool isRemainingSizeKnown()=0
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:135
std::string getFirstLine() const override
Definition: FileReader.cpp:366
size_t getRemainingSize() override
Definition: FileReader.cpp:562
virtual FirstLineByFilePath getFirstLineForEachFile() const =0
std::vector< std::string > file_locations_
Definition: FileReader.h:358
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: FileReader.cpp:544
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.h:154
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:357
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:361
size_t read(void *buffer, size_t max_size) override
Definition: FileReader.cpp:291
virtual void checkForMoreRows(size_t file_offset, const ForeignServer *server_options=nullptr, const UserMapping *user_mapping=nullptr)
Definition: FileReader.h:89
~SingleFileReader() override=default
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: FileReader.cpp:646