OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "ParquetImporter.h"
17 
18 #include <queue>
19 
20 #include <arrow/filesystem/localfs.h>
21 #include <boost/filesystem.hpp>
22 
24 #include "FsiJsonUtils.h"
25 #include "LazyParquetChunkLoader.h"
26 #include "ParquetShared.h"
27 #include "Shared/misc.h"
29 #include "Utils/DdlUtils.h"
30 
31 namespace foreign_storage {
32 
34  public:
35  RowGroupIntervalTracker(const std::set<std::string>& file_paths,
36  FileReaderMap* file_reader_cache,
37  std::shared_ptr<arrow::fs::FileSystem> file_system)
38  : file_paths_(file_paths)
39  , file_reader_cache_(file_reader_cache)
40  , file_system_(file_system)
42  , num_row_groups_(0)
44  , current_file_iter_(file_paths_.begin()) {}
45 
46  std::optional<RowGroupInterval> getNextRowGroupInterval() override {
48  if (filesAreExhausted()) {
49  return {};
50  }
51  return RowGroupInterval{
52  *current_file_iter_, current_row_group_index_, current_row_group_index_};
53  }
54 
55  private:
56  bool filesAreExhausted() { return current_file_iter_ == file_paths_.end(); }
57 
61  return;
62  }
63  if (!is_initialized_) {
65  is_initialized_ = true;
66  } else {
68  current_file_iter_++; // advance iterator
69  }
71  if (filesAreExhausted()) {
72  num_row_groups_ = 0;
73  } else {
74  auto file_reader =
76  num_row_groups_ = file_reader->parquet_reader()->metadata()->num_row_groups();
77  }
78  }
79 
80  std::set<std::string> file_paths_;
82  std::shared_ptr<arrow::fs::FileSystem> file_system_;
83 
87  std::set<std::string>::const_iterator current_file_iter_;
88 };
89 
91  public:
92  ParquetImportBatchResult() = default;
93  ParquetImportBatchResult(const ForeignTable* foreign_table,
94  const int db_id,
95  const ForeignTableSchema* schema);
97 
98  std::optional<Fragmenter_Namespace::InsertData> getInsertData() const override;
100 
101  std::pair<std::map<int, Chunk_NS::Chunk>, std::map<int, StringDictionary*>>
102  getChunksAndDictionaries() const;
103 
104  void populateInsertData(const std::map<int, Chunk_NS::Chunk>& chunks);
105  void populateImportStatus(const size_t num_rows_completed,
106  const size_t num_rows_rejected);
107 
108  private:
109  std::optional<Fragmenter_Namespace::InsertData> insert_data_;
110  std::map<int, std::unique_ptr<AbstractBuffer>> import_buffers_; // holds data
111 
113  int db_id_;
116 };
117 
118 void ParquetImportBatchResult::populateImportStatus(const size_t num_rows_completed,
119  const size_t num_rows_rejected) {
120  import_status_.rows_completed = num_rows_completed;
121  import_status_.rows_rejected = num_rows_rejected;
122 }
123 
125  const std::map<int, Chunk_NS::Chunk>& chunks) {
127  size_t num_rows = chunks.begin()->second.getBuffer()->getEncoder()->getNumElems();
128  for (const auto& [column_id, chunk] : chunks) {
129  auto column_descriptor = chunk.getColumnDesc();
130  CHECK(chunk.getBuffer()->getEncoder()->getNumElems() == num_rows);
131  insert_data_->columnIds.emplace_back(column_id);
132  auto buffer = chunk.getBuffer();
133  DataBlockPtr block_ptr;
134  if (column_descriptor->columnType.is_array()) {
135  auto array_buffer = dynamic_cast<TypedParquetStorageBuffer<ArrayDatum>*>(buffer);
136  block_ptr.arraysPtr = array_buffer->getBufferPtr();
137  } else if ((column_descriptor->columnType.is_string() &&
138  !column_descriptor->columnType.is_dict_encoded_string()) ||
139  column_descriptor->columnType.is_geometry()) {
140  auto string_buffer = dynamic_cast<TypedParquetStorageBuffer<std::string>*>(buffer);
141  block_ptr.stringsPtr = string_buffer->getBufferPtr();
142  } else {
143  block_ptr.numbersPtr = buffer->getMemoryPtr();
144  }
145  insert_data_->data.emplace_back(block_ptr);
146  }
147  insert_data_->databaseId = db_id_;
148  insert_data_->tableId = foreign_table_->tableId;
149  insert_data_->is_default.assign(insert_data_->columnIds.size(), false);
150  insert_data_->numRows = num_rows;
151 }
152 
153 std::pair<std::map<int, Chunk_NS::Chunk>, std::map<int, StringDictionary*>>
155  std::map<int, Chunk_NS::Chunk> chunks;
156  std::map<int, StringDictionary*> string_dictionaries;
158 
159  for (const auto column_descriptor : schema_->getLogicalAndPhysicalColumns()) {
160  const bool is_dictionary_encoded_string_column =
161  column_descriptor->columnType.is_dict_encoded_string() ||
162  (column_descriptor->columnType.is_array() &&
163  column_descriptor->columnType.get_elem_type().is_dict_encoded_string());
164 
165  if (is_dictionary_encoded_string_column) {
166  auto dict_descriptor = catalog->getMetadataForDict(
167  column_descriptor->columnType.get_comp_param(), true);
168  CHECK(dict_descriptor);
169  auto string_dictionary = dict_descriptor->stringDict.get();
170  string_dictionaries[column_descriptor->columnId] = string_dictionary;
171  }
172 
173  Chunk_NS::Chunk chunk{column_descriptor};
174  chunk.setBuffer(import_buffers_.at(column_descriptor->columnId).get());
175  if (column_descriptor->columnType.is_varlen_indeed()) {
176  chunk.setIndexBuffer(nullptr); // index buffers are unused
177  }
178  chunk.initEncoder();
179  chunks[column_descriptor->columnId] = chunk;
180  }
181  return {chunks, string_dictionaries};
182 }
183 
185  const int db_id,
186  const ForeignTableSchema* schema)
187  : foreign_table_(foreign_table), db_id_(db_id), schema_(schema) {
188  for (const auto column_descriptor : schema_->getLogicalAndPhysicalColumns()) {
189  if (column_descriptor->columnType.is_array()) {
190  import_buffers_[column_descriptor->columnId] =
191  std::make_unique<TypedParquetStorageBuffer<ArrayDatum>>();
192  } else if ((column_descriptor->columnType.is_string() &&
193  !column_descriptor->columnType.is_dict_encoded_string()) ||
194  column_descriptor->columnType.is_geometry()) {
195  import_buffers_[column_descriptor->columnId] =
196  std::make_unique<TypedParquetStorageBuffer<std::string>>();
197  } else {
198  import_buffers_[column_descriptor->columnId] =
199  std::make_unique<ForeignStorageBuffer>();
200  }
201  }
202 }
203 
204 std::optional<Fragmenter_Namespace::InsertData> ParquetImportBatchResult::getInsertData()
205  const {
206  return insert_data_;
207 }
208 
210  return import_status_;
211 }
212 
213 ParquetImporter::ParquetImporter() : db_id_(-1), foreign_table_(nullptr) {}
214 
216  const ForeignTable* foreign_table,
217  const UserMapping* user_mapping)
218  : db_id_(db_id)
219  , foreign_table_(foreign_table)
220  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
221  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
222  auto& server_options = foreign_table->foreign_server->options;
223  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
224  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
225  } else {
226  UNREACHABLE();
227  }
228 }
229 
230 std::set<std::string> ParquetImporter::getAllFilePaths() {
231  auto timer = DEBUG_TIMER(__func__);
232  std::set<std::string> file_paths;
233  auto file_path = getFullFilePath(foreign_table_);
234  auto file_info_result = file_system_->GetFileInfo(file_path);
235  if (!file_info_result.ok()) {
236  throw_file_access_error(file_path, file_info_result.status().message());
237  } else {
238  auto& file_info = file_info_result.ValueOrDie();
239  if (file_info.type() == arrow::fs::FileType::NotFound) {
240  throw_file_not_found_error(file_path);
241  } else if (file_info.type() == arrow::fs::FileType::File) {
242  file_paths.emplace(file_path);
243  } else {
244  CHECK_EQ(arrow::fs::FileType::Directory, file_info.type());
245  arrow::fs::FileSelector file_selector{};
246  file_selector.base_dir = file_path;
247  file_selector.recursive = true;
248  auto selector_result = file_system_->GetFileInfo(file_selector);
249  if (!selector_result.ok()) {
250  throw_file_access_error(file_path, selector_result.status().message());
251  } else {
252  auto& file_info_vector = selector_result.ValueOrDie();
253  for (const auto& file_info : file_info_vector) {
254  if (file_info.type() == arrow::fs::FileType::File) {
255  file_paths.emplace(file_info.path());
256  }
257  }
258  }
259  }
260  }
261  return file_paths;
262 }
263 
265  UNREACHABLE();
266 }
267 
269  const ChunkToBufferMap& optional_buffers) {
270  UNREACHABLE();
271 }
272 
274  UNREACHABLE();
275  return {};
276 }
277 
278 std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>
281 }
282 
283 std::unique_ptr<import_export::ImportBatchResult> ParquetImporter::getNextImportBatch() {
285  row_group_interval_tracker_ = std::make_unique<RowGroupIntervalTracker>(
287  }
288 
289  auto import_batch_result =
290  std::make_unique<ParquetImportBatchResult>(foreign_table_, db_id_, schema_.get());
291  auto [chunks, string_dictionaries] = import_batch_result->getChunksAndDictionaries();
292  if (!string_dictionaries_per_column_.size()) {
293  for (const auto& [column_id, dict] : string_dictionaries) {
294  string_dictionaries_per_column_.emplace_back(chunks[column_id].getColumnDesc(),
295  dict);
296  }
297  }
298 
300 
301  auto next_row_group = row_group_interval_tracker_->getNextRowGroupInterval();
302  size_t num_rows_completed, num_rows_rejected;
303  if (next_row_group.has_value()) {
304  std::tie(num_rows_completed, num_rows_rejected) = chunk_loader.loadRowGroups(
305  *next_row_group, chunks, *schema_, string_dictionaries);
306  } else {
307  return import_batch_result; // terminate without populating data, read the last row
308  // group
309  }
310 
311  import_batch_result->populateImportStatus(num_rows_completed, num_rows_rejected);
312  import_batch_result->populateInsertData(chunks);
313 
314  return import_batch_result;
315 }
316 
318  const std::string& file_path,
319  const ChunkMetadataVector& chunk_metadata_vector) {
320  UNREACHABLE();
321 }
322 
324  UNREACHABLE();
325  return {};
326 }
327 
328 } // namespace foreign_storage
std::set< std::string > getAllFilePaths()
#define CHECK_EQ(x, y)
Definition: Logger.h:217
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
void populateImportStatus(const size_t num_rows_completed, const size_t num_rows_rejected)
std::pair< std::map< int, Chunk_NS::Chunk >, std::map< int, StringDictionary * > > getChunksAndDictionaries() const
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::unique_ptr< AbstractRowGroupIntervalTracker > row_group_interval_tracker_
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
std::unique_ptr< ForeignTableSchema > schema_
void throw_file_access_error(const std::string &file_path, const std::string &message)
std::optional< Fragmenter_Namespace::InsertData > insert_data_
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
#define UNREACHABLE()
Definition: Logger.h:253
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::optional< RowGroupInterval > getNextRowGroupInterval() override
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > getStringDictionaries() const
void throw_file_not_found_error(const std::string &file_path)
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:111
std::set< std::string >::const_iterator current_file_iter_
static SysCatalog & instance()
Definition: SysCatalog.h:325
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::map< int, std::unique_ptr< AbstractBuffer > > import_buffers_
std::optional< Fragmenter_Namespace::InsertData > getInsertData() const override
std::string getSerializedDataWrapper() const override
void populateInsertData(const std::map< int, Chunk_NS::Chunk > &chunks)
const ForeignTable * foreign_table_
RowGroupIntervalTracker(const std::set< std::string > &file_paths, FileReaderMap *file_reader_cache, std::shared_ptr< arrow::fs::FileSystem > file_system)
import_export::ImportStatus getImportStatus() const override
std::unique_ptr< FileReaderMap > file_reader_cache_
const ForeignServer * foreign_server
Definition: ForeignTable.h:54
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
std::shared_ptr< arrow::fs::FileSystem > file_system_
int8_t * numbersPtr
Definition: sqltypes.h:226
std::unique_ptr< import_export::ImportBatchResult > getNextImportBatch()
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
import_export::ImportStatus import_status_
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_