OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetImporter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ParquetImporter.h"
18 
19 #include <queue>
20 
21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
23 
24 #include "Catalog/Catalog.h"
25 #include "Catalog/ForeignTable.h"
26 #include "DataMgr/Chunk/Chunk.h"
28 #include "FsiJsonUtils.h"
29 #include "LazyParquetChunkLoader.h"
30 #include "ParquetShared.h"
31 #include "Shared/misc.h"
33 #include "Utils/DdlUtils.h"
34 
35 namespace foreign_storage {
36 
38  public:
39  RowGroupIntervalTracker(const std::set<std::string>& file_paths,
40  FileReaderMap* file_reader_cache,
41  std::shared_ptr<arrow::fs::FileSystem> file_system)
42  : file_paths_(file_paths)
43  , file_reader_cache_(file_reader_cache)
44  , file_system_(file_system)
46  , num_row_groups_(0)
48  , current_file_iter_(file_paths_.begin()) {}
49 
50  std::optional<RowGroupInterval> getNextRowGroupInterval() override {
52  if (filesAreExhausted()) {
53  return {};
54  }
55  return RowGroupInterval{
56  *current_file_iter_, current_row_group_index_, current_row_group_index_};
57  }
58 
59  private:
60  bool filesAreExhausted() { return current_file_iter_ == file_paths_.end(); }
61 
65  return;
66  }
67  if (!is_initialized_) {
69  is_initialized_ = true;
70  } else {
71  if (filesAreExhausted()) { // can be possible if many concurrent requests
72  return;
73  }
74  current_file_iter_++; // advance iterator
75  }
77  if (filesAreExhausted()) {
78  num_row_groups_ = 0;
79  } else {
80  auto file_reader =
82  num_row_groups_ = file_reader->parquet_reader()->metadata()->num_row_groups();
83  }
84  }
85 
86  std::set<std::string> file_paths_;
88  std::shared_ptr<arrow::fs::FileSystem> file_system_;
89 
93  std::set<std::string>::const_iterator current_file_iter_;
94 };
95 
97  public:
98  ParquetImportBatchResult() = default;
99  ParquetImportBatchResult(const ForeignTable* foreign_table,
100  const int db_id,
101  const ForeignTableSchema* schema);
103 
104  std::optional<Fragmenter_Namespace::InsertData> getInsertData() const override;
106 
107  std::pair<std::map<int, Chunk_NS::Chunk>, std::map<int, StringDictionary*>>
108  getChunksAndDictionaries() const;
109 
110  void populateInsertData(const std::map<int, Chunk_NS::Chunk>& chunks);
111  void populateImportStatus(const size_t num_rows_completed,
112  const size_t num_rows_rejected);
113 
114  private:
115  std::optional<Fragmenter_Namespace::InsertData> insert_data_;
116  std::map<int, std::unique_ptr<AbstractBuffer>> import_buffers_; // holds data
117 
119  int db_id_;
122 };
123 
124 void ParquetImportBatchResult::populateImportStatus(const size_t num_rows_completed,
125  const size_t num_rows_rejected) {
126  import_status_.rows_completed = num_rows_completed;
127  import_status_.rows_rejected = num_rows_rejected;
128 }
129 
131  const std::map<int, Chunk_NS::Chunk>& chunks) {
133  size_t num_rows = chunks.begin()->second.getBuffer()->getEncoder()->getNumElems();
134  for (const auto& [column_id, chunk] : chunks) {
135  auto column_descriptor = chunk.getColumnDesc();
136  CHECK(chunk.getBuffer()->getEncoder()->getNumElems() == num_rows);
137  insert_data_->columnIds.emplace_back(column_id);
138  auto buffer = chunk.getBuffer();
139  DataBlockPtr block_ptr;
140  if (column_descriptor->columnType.is_array()) {
141  auto array_buffer = dynamic_cast<TypedParquetStorageBuffer<ArrayDatum>*>(buffer);
142  block_ptr.arraysPtr = array_buffer->getBufferPtr();
143  } else if ((column_descriptor->columnType.is_string() &&
144  !column_descriptor->columnType.is_dict_encoded_string()) ||
145  column_descriptor->columnType.is_geometry()) {
146  auto string_buffer = dynamic_cast<TypedParquetStorageBuffer<std::string>*>(buffer);
147  block_ptr.stringsPtr = string_buffer->getBufferPtr();
148  } else {
149  block_ptr.numbersPtr = buffer->getMemoryPtr();
150  }
151  insert_data_->data.emplace_back(block_ptr);
152  }
153  insert_data_->databaseId = db_id_;
154  insert_data_->tableId = foreign_table_->tableId;
155  insert_data_->is_default.assign(insert_data_->columnIds.size(), false);
156  insert_data_->numRows = num_rows;
157 }
158 
159 std::pair<std::map<int, Chunk_NS::Chunk>, std::map<int, StringDictionary*>>
161  std::map<int, Chunk_NS::Chunk> chunks;
162  std::map<int, StringDictionary*> string_dictionaries;
164 
165  for (const auto column_descriptor : schema_->getLogicalAndPhysicalColumns()) {
166  const bool is_dictionary_encoded_string_column =
167  column_descriptor->columnType.is_dict_encoded_string() ||
168  (column_descriptor->columnType.is_array() &&
169  column_descriptor->columnType.get_elem_type().is_dict_encoded_string());
170 
171  if (is_dictionary_encoded_string_column) {
172  auto dict_descriptor = catalog->getMetadataForDict(
173  column_descriptor->columnType.get_comp_param(), true);
174  CHECK(dict_descriptor);
175  auto string_dictionary = dict_descriptor->stringDict.get();
176  string_dictionaries[column_descriptor->columnId] = string_dictionary;
177  }
178 
179  Chunk_NS::Chunk chunk{column_descriptor};
180  chunk.setBuffer(import_buffers_.at(column_descriptor->columnId).get());
181  if (column_descriptor->columnType.is_varlen_indeed()) {
182  chunk.setIndexBuffer(nullptr); // index buffers are unused
183  }
184  chunk.initEncoder();
185  chunks[column_descriptor->columnId] = chunk;
186  }
187  return {chunks, string_dictionaries};
188 }
189 
191  const int db_id,
192  const ForeignTableSchema* schema)
193  : foreign_table_(foreign_table), db_id_(db_id), schema_(schema) {
194  for (const auto column_descriptor : schema_->getLogicalAndPhysicalColumns()) {
195  if (column_descriptor->columnType.is_array()) {
196  import_buffers_[column_descriptor->columnId] =
197  std::make_unique<TypedParquetStorageBuffer<ArrayDatum>>();
198  } else if ((column_descriptor->columnType.is_string() &&
199  !column_descriptor->columnType.is_dict_encoded_string()) ||
200  column_descriptor->columnType.is_geometry()) {
201  import_buffers_[column_descriptor->columnId] =
202  std::make_unique<TypedParquetStorageBuffer<std::string>>();
203  } else {
204  import_buffers_[column_descriptor->columnId] =
205  std::make_unique<ForeignStorageBuffer>();
206  }
207  }
208 }
209 
210 std::optional<Fragmenter_Namespace::InsertData> ParquetImportBatchResult::getInsertData()
211  const {
212  return insert_data_;
213 }
214 
216  return import_status_;
217 }
218 
220  return schema_->numLogicalColumns();
221 }
222 
223 void ParquetImporter::setNumThreads(const int num_threads) {
224  num_threads_ = num_threads;
225 }
226 
228  : db_id_(-1), foreign_table_(nullptr), num_threads_(1) {}
229 
231  const ForeignTable* foreign_table,
232  const UserMapping* user_mapping)
233  : db_id_(db_id)
234  , foreign_table_(foreign_table)
235  , num_threads_(1)
236  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
237  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
238  auto& server_options = foreign_table->foreign_server->options;
239  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
240  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
241  } else {
242  UNREACHABLE();
243  }
244 }
245 
246 std::set<std::string> ParquetImporter::getAllFilePaths() {
247  auto timer = DEBUG_TIMER(__func__);
248  std::set<std::string> file_paths;
249  auto file_path = getFullFilePath(foreign_table_);
250  arrow::Result<arrow::fs::FileInfo> file_info_result;
251  arrow::Result<std::vector<arrow::fs::FileInfo>> selector_result;
252  {
253  auto get_file_info_timer = DEBUG_TIMER("GetFileInfo-file_info");
254  file_info_result = file_system_->GetFileInfo(file_path);
255  }
256  if (!file_info_result.ok()) {
257  throw_file_access_error(file_path, file_info_result.status().message());
258  } else {
259  auto& file_info = file_info_result.ValueOrDie();
260  if (file_info.type() == arrow::fs::FileType::NotFound) {
261  throw_file_not_found_error(file_path);
262  } else if (file_info.type() == arrow::fs::FileType::File) {
263  file_paths.emplace(file_path);
264  } else {
265  CHECK_EQ(arrow::fs::FileType::Directory, file_info.type());
266  arrow::fs::FileSelector file_selector{};
267  file_selector.base_dir = file_path;
268  file_selector.recursive = true;
269  {
270  auto get_file_info_timer = DEBUG_TIMER("GetFileInfo-selector");
271  selector_result = file_system_->GetFileInfo(file_selector);
272  }
273  if (!selector_result.ok()) {
274  throw_file_access_error(file_path, selector_result.status().message());
275  } else {
276  auto& file_info_vector = selector_result.ValueOrDie();
277  for (const auto& file_info : file_info_vector) {
278  if (file_info.type() == arrow::fs::FileType::File) {
279  file_paths.emplace(file_info.path());
280  }
281  }
282  }
283  }
284  }
285  return file_paths;
286 }
287 
289  UNREACHABLE();
290 }
291 
293  const ChunkToBufferMap& optional_buffers,
294  AbstractBuffer* delete_buffer) {
295  UNREACHABLE();
296 }
297 
299  UNREACHABLE();
300  return {};
301 }
302 
303 std::vector<std::pair<const ColumnDescriptor*, StringDictionary*>>
306 }
307 
308 std::unique_ptr<import_export::ImportBatchResult> ParquetImporter::getNextImportBatch() {
309  {
310  std::unique_lock row_group_interval_tracker_lock(row_group_interval_tracker_mutex_);
312  row_group_interval_tracker_ = std::make_unique<RowGroupIntervalTracker>(
314  }
315  }
316 
317  auto import_batch_result =
318  std::make_unique<ParquetImportBatchResult>(foreign_table_, db_id_, schema_.get());
319  auto [chunks, string_dictionaries] = import_batch_result->getChunksAndDictionaries();
320 
321  {
322  std::unique_lock string_dictionaries_per_column_lock(
324  if (!string_dictionaries_per_column_.size()) {
325  for (const auto& [column_id, dict] : string_dictionaries) {
326  string_dictionaries_per_column_.emplace_back(chunks[column_id].getColumnDesc(),
327  dict);
328  }
329  }
330  }
331 
332  // this code path is deprecated and does not need a RenderGroupAnalyzerMap
333  LazyParquetChunkLoader chunk_loader(
334  file_system_, file_reader_cache_.get(), nullptr, foreign_table_->tableName);
335 
336  std::optional<RowGroupInterval> next_row_group;
337  {
338  std::unique_lock row_group_interval_tracker_lock(row_group_interval_tracker_mutex_);
339  next_row_group = row_group_interval_tracker_->getNextRowGroupInterval();
340  }
341  size_t num_rows_completed, num_rows_rejected;
342  if (next_row_group.has_value()) {
343  std::tie(num_rows_completed, num_rows_rejected) = chunk_loader.loadRowGroups(
344  *next_row_group, chunks, *schema_, string_dictionaries, num_threads_);
345  } else {
346  return import_batch_result; // terminate without populating data, read the last row
347  // group
348  }
349 
350  import_batch_result->populateImportStatus(num_rows_completed, num_rows_rejected);
351  import_batch_result->populateInsertData(chunks);
352 
353  return import_batch_result;
354 }
355 
357  const std::string& file_path,
358  const ChunkMetadataVector& chunk_metadata_vector) {
359  UNREACHABLE();
360 }
361 
363  UNREACHABLE();
364  return {};
365 }
366 
367 } // namespace foreign_storage
std::set< std::string > getAllFilePaths()
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::shared_mutex row_group_interval_tracker_mutex_
void populateImportStatus(const size_t num_rows_completed, const size_t num_rows_rejected)
std::pair< std::map< int, Chunk_NS::Chunk >, std::map< int, StringDictionary * > > getChunksAndDictionaries() const
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:224
void setNumThreads(const int num_threads)
std::unique_ptr< AbstractRowGroupIntervalTracker > row_group_interval_tracker_
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:225
std::unique_ptr< ForeignTableSchema > schema_
void throw_file_access_error(const std::string &file_path, const std::string &message)
std::optional< Fragmenter_Namespace::InsertData > insert_data_
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
#define UNREACHABLE()
Definition: Logger.h:337
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::shared_mutex string_dictionaries_per_column_mutex_
std::optional< RowGroupInterval > getNextRowGroupInterval() override
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > getStringDictionaries() const
void throw_file_not_found_error(const std::string &file_path)
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:150
This file contains the class specification and related data structures for Catalog.
std::set< std::string >::const_iterator current_file_iter_
static SysCatalog & instance()
Definition: SysCatalog.h:343
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
std::unique_lock< T > unique_lock
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::map< int, std::unique_ptr< AbstractBuffer > > import_buffers_
std::optional< Fragmenter_Namespace::InsertData > getInsertData() const override
std::string getSerializedDataWrapper() const override
void populateInsertData(const std::map< int, Chunk_NS::Chunk > &chunks)
const ForeignTable * foreign_table_
RowGroupIntervalTracker(const std::set< std::string > &file_paths, FileReaderMap *file_reader_cache, std::shared_ptr< arrow::fs::FileSystem > file_system)
import_export::ImportStatus getImportStatus() const override
std::unique_ptr< FileReaderMap > file_reader_cache_
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
std::shared_ptr< arrow::fs::FileSystem > file_system_
int8_t * numbersPtr
Definition: sqltypes.h:223
std::unique_ptr< import_export::ImportBatchResult > getNextImportBatch()
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
import_export::ImportStatus import_status_
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_