OmniSciDB  95562058bd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CsvReader.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/filesystem.hpp>
20 #include "rapidjson/document.h"
21 
24 
25 namespace foreign_storage {
26 
27 struct ForeignServer;
28 struct UserMapping;
29 
30 // Archive reader for csv archives
31 // Supports an initial full scan with calls to read()
32 // When the entire Csv object has been read isScanFinished() returns true
33 // Previously read data can then be re-read with readRegion()
34 class CsvReader {
35  public:
36  CsvReader(const std::string& file_path, const import_export::CopyParams& copy_params)
37  : copy_params_(copy_params), file_path_(file_path){};
38  virtual ~CsvReader() = default;
39 
48  virtual size_t read(void* buffer, size_t max_size) = 0;
49 
53  virtual bool isScanFinished() = 0;
54 
64  virtual size_t readRegion(void* buffer, size_t offset, size_t size) = 0;
65 
69  virtual size_t getRemainingSize() = 0;
70 
74  virtual bool isRemainingSizeKnown() = 0;
84  virtual void checkForMoreRows(size_t file_offset,
85  const ForeignServer* server_options = nullptr,
86  const UserMapping* user_mapping = nullptr) {
87  throw std::runtime_error{"APPEND mode not yet supported for this table."};
88  }
89 
98  virtual void serialize(rapidjson::Value& value,
99  rapidjson::Document::AllocatorType& allocator) const = 0;
100 
101  protected:
103  std::string file_path_;
104 };
105 
106 // Single uncompressed file, that supports FSEEK for faster random access
107 class SingleFileReader : public CsvReader {
108  public:
109  SingleFileReader(const std::string& file_path,
110  const import_export::CopyParams& copy_params);
111  SingleFileReader(const std::string& file_path,
112  const import_export::CopyParams& copy_params,
113  const rapidjson::Value& value);
114  ~SingleFileReader() override { fclose(file_); }
115 
116  // Delete copy assignment to prevent copying resource pointer
117  SingleFileReader(const SingleFileReader&) = delete;
118  SingleFileReader& operator=(const SingleFileReader&) = delete;
119 
120  size_t read(void* buffer, size_t max_size) override {
121  size_t bytes_read = fread(buffer, 1, max_size, file_);
122  if (!scan_finished_) {
123  scan_finished_ = feof(file_);
124  }
125 
126  total_bytes_read_ += bytes_read;
127  return bytes_read;
128  }
129 
130  size_t readRegion(void* buffer, size_t offset, size_t size) override {
132  if (fseek(file_, static_cast<long int>(offset + header_offset_), SEEK_SET) != 0) {
133  throw std::runtime_error{"An error occurred when attempting to read offset " +
134  std::to_string(offset) + " in file: \"" + file_path_ +
135  "\". " + strerror(errno)};
136  }
137  return fread(buffer, 1, size, file_);
138  }
139 
140  bool isScanFinished() override { return scan_finished_; }
141 
142  size_t getRemainingSize() override { return data_size_ - total_bytes_read_; }
143 
144  bool isRemainingSizeKnown() override { return true; };
145  void checkForMoreRows(size_t file_offset,
146  const ForeignServer* server_options,
147  const UserMapping* user_mapping) override;
148 
149  void serialize(rapidjson::Value& value,
150  rapidjson::Document::AllocatorType& allocator) const override;
151 
152  private:
153  std::FILE* file_;
154  // Size of CSV data in file
155  size_t data_size_;
156  // We've reached the end of the file
158 
159  // Size of the CSV header in bytes
161 
163 };
164 
166  public:
167  ArchiveWrapper(const std::string& file_path)
168  : current_block_(nullptr)
170  , current_entry_(-1)
171  , file_path_(file_path) {
172  resetArchive();
173  }
174 
178  void skipToEntry(int entry_number);
179 
180  // Go to next consecutive entry in archive
181  bool nextEntry();
182 
183  bool currentEntryFinished() const { return (block_chars_remaining_ == 0); }
184 
186 
187  // Consume given amount of data from current block, copying into dest_buffer if set
188  void consumeDataFromCurrentEntry(size_t size, char* dest_buffer = nullptr);
189 
190  // Return the next char from the buffer without consuming it
191  char peekNextChar();
192 
193  int getCurrentEntryIndex() const { return current_entry_; }
194 
195  // Reset archive, start reading again from the first entry
196  void resetArchive();
197 
198  std::string entryName() { return arch_->entryName(); }
199 
200  private:
204  void fetchBlock();
205 
206  std::unique_ptr<Archive> arch_;
207  // Pointer to current uncompressed block from the archive
208  const void* current_block_;
209  // Number of chars remaining in the current block
211  // Index of entry of the archive file
213 
214  std::string file_path_;
215 };
216 
217 // Single archive, does not support random access
219  public:
220  CompressedFileReader(const std::string& file_path,
221  const import_export::CopyParams& copy_params);
222  CompressedFileReader(const std::string& file_path,
223  const import_export::CopyParams& copy_params,
224  const rapidjson::Value& value);
225  size_t read(void* buffer, size_t max_size) override;
226 
227  size_t readRegion(void* buffer, size_t offset, size_t size) override;
228 
229  bool isScanFinished() override { return scan_finished_; }
230 
231  bool isRemainingSizeKnown() override { return false; };
232  size_t getRemainingSize() override { return 0; }
233 
234  void serialize(rapidjson::Value& value,
235  rapidjson::Document::AllocatorType& allocator) const override;
236 
237  private:
241  void resetArchive();
242 
243  void checkForMoreRows(size_t file_offset,
244  const ForeignServer* server_options,
245  const UserMapping* user_mapping) override;
246 
247  private:
251  void nextEntry();
252 
256  void skipHeader();
257 
262  void skipBytes(size_t n_bytes);
263 
264  // Read bytes in current entry adjusting for EOF
265  size_t readInternal(void* buffer, size_t read_size, size_t buffer_size);
266 
268 
269  // Are we doing initial scan or an append
271  // We've reached the end of the last file
273 
274  // Overall number of bytes read in the archive (minus headers)
276 
277  // Index of current entry in order they appear in
278  // cumulative_sizes_/sourcenames_/archive_entry_index_
280 
281  // Size of each file + all previous files
282  std::vector<size_t> cumulative_sizes_;
283  // Names of the file in the archive
284  std::vector<std::string> sourcenames_;
285  // Index of the entry in the archive
286  // Order can change during append operation
287  std::vector<int> archive_entry_index_;
288 };
289 
290 // Combines several archives into single object
291 class MultiFileReader : public CsvReader {
292  public:
293  MultiFileReader(const std::string& file_path,
294  const import_export::CopyParams& copy_params);
295  MultiFileReader(const std::string& file_path,
296  const import_export::CopyParams& copy_params,
297  const rapidjson::Value& value);
298 
299  size_t getRemainingSize() override;
300 
301  bool isRemainingSizeKnown() override;
302 
303  size_t read(void* buffer, size_t max_size) override;
304 
305  size_t readRegion(void* buffer, size_t offset, size_t size) override;
306 
307  bool isScanFinished() override { return (current_index_ >= files_.size()); }
308 
309  void serialize(rapidjson::Value& value,
310  rapidjson::Document::AllocatorType& allocator) const override;
311 
312  protected:
313  std::vector<std::unique_ptr<CsvReader>> files_;
314  std::vector<std::string> file_locations_;
315 
316  // Size of each file + all previous files
317  std::vector<size_t> cumulative_sizes_;
318  // Current file being read
320  // Overall number of bytes read in the directory (minus headers)
322 };
323 
324 // Single file or directory with multiple files
326  public:
327  LocalMultiFileReader(const std::string& file_path,
328  const import_export::CopyParams& copy_params);
329 
330  LocalMultiFileReader(const std::string& file_path,
331  const import_export::CopyParams& copy_params,
332  const rapidjson::Value& value);
333 
334  void checkForMoreRows(size_t file_offset,
335  const ForeignServer* server_options,
336  const UserMapping* user_mapping) override;
337 
338  private:
339  void insertFile(std::string location);
340 };
341 
342 } // namespace foreign_storage
std::vector< std::string > sourcenames_
Definition: CsvReader.h:284
std::vector< int > archive_entry_index_
Definition: CsvReader.h:287
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:282
import_export::CopyParams copy_params_
Definition: CsvReader.h:102
CompressedFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:215
virtual size_t read(void *buffer, size_t max_size)=0
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: CsvReader.cpp:273
size_t readInternal(void *buffer, size_t read_size, size_t buffer_size)
Definition: CsvReader.cpp:241
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:476
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: CsvReader.cpp:126
std::string to_string(char const *&&v)
virtual ~CsvReader()=default
virtual void checkForMoreRows(size_t file_offset, const ForeignServer *server_options=nullptr, const UserMapping *user_mapping=nullptr)
Definition: CsvReader.h:84
void insertFile(std::string location)
Definition: CsvReader.cpp:589
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: CsvReader.cpp:462
LocalMultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:529
void consumeDataFromCurrentEntry(size_t size, char *dest_buffer=nullptr)
Definition: CsvReader.cpp:182
virtual void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const =0
bool isRemainingSizeKnown() override
Definition: CsvReader.cpp:521
bool isRemainingSizeKnown() override
Definition: CsvReader.h:144
ArchiveWrapper(const std::string &file_path)
Definition: CsvReader.h:167
bool currentEntryFinished() const
Definition: CsvReader.h:183
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: CsvReader.cpp:661
CsvReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.h:36
size_t getRemainingSize() override
Definition: CsvReader.h:142
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: CsvReader.cpp:117
std::unique_ptr< Archive > arch_
Definition: CsvReader.h:206
virtual size_t getRemainingSize()=0
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
SingleFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:69
virtual bool isScanFinished()=0
size_t read(void *buffer, size_t max_size) override
Definition: CsvReader.cpp:644
size_t read(void *buffer, size_t max_size) override
Definition: CsvReader.h:120
size_t readRegion(void *buffer, size_t offset, size_t size) override
Definition: CsvReader.h:130
void skipToEntry(int entry_number)
Definition: CsvReader.cpp:158
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: CsvReader.cpp:369
#define CHECK(condition)
Definition: Logger.h:197
size_t currentEntryDataAvailable() const
Definition: CsvReader.h:185
bool isScanFinished() override
Definition: CsvReader.h:307
size_t getRemainingSize() override
Definition: CsvReader.cpp:513
SingleFileReader & operator=(const SingleFileReader &)=delete
std::vector< std::string > file_locations_
Definition: CsvReader.h:314
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: CsvReader.cpp:495
virtual bool isRemainingSizeKnown()=0
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:317
size_t read(void *buffer, size_t max_size) override
Definition: CsvReader.cpp:267
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
Definition: CsvReader.cpp:603