OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParquetDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ParquetDataWrapper.h"
18 
19 #include <regex>
20 
21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
23 
25 #include "FsiJsonUtils.h"
26 #include "ImportExport/Importer.h"
27 #include "LazyParquetChunkLoader.h"
28 #include "ParquetShared.h"
29 #include "Utils/DdlUtils.h"
30 
31 namespace foreign_storage {
32 
33 namespace {
34 template <typename T>
35 std::pair<typename std::map<ChunkKey, T>::iterator,
36  typename std::map<ChunkKey, T>::iterator>
37 prefix_range(std::map<ChunkKey, T>& map, const ChunkKey& chunk_key_prefix) {
38  ChunkKey chunk_key_prefix_sentinel = chunk_key_prefix;
39  chunk_key_prefix_sentinel.push_back(std::numeric_limits<int>::max());
40  auto begin = map.lower_bound(chunk_key_prefix);
41  auto end = map.upper_bound(chunk_key_prefix_sentinel);
42  return std::make_pair(begin, end);
43 }
44 
45 void reduce_metadata(std::shared_ptr<ChunkMetadata> reduce_to,
46  std::shared_ptr<ChunkMetadata> reduce_from) {
47  CHECK(reduce_to->sqlType == reduce_from->sqlType);
48  reduce_to->numBytes += reduce_from->numBytes;
49  reduce_to->numElements += reduce_from->numElements;
50  reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
51 
52  auto column_type = reduce_to->sqlType;
53  column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
54 
55  // metadata reducution is done at metadata scan time, both string & geometry
56  // columns have no valid stats to reduce beyond `has_nulls`
57  if (column_type.is_string() || column_type.is_geometry()) {
58  return;
59  }
60 
61  ForeignStorageBuffer buffer_to;
62  buffer_to.initEncoder(column_type);
63  auto encoder_to = buffer_to.getEncoder();
64  encoder_to->resetChunkStats(reduce_to->chunkStats);
65 
66  ForeignStorageBuffer buffer_from;
67  buffer_from.initEncoder(column_type);
68  auto encoder_from = buffer_from.getEncoder();
69  encoder_from->resetChunkStats(reduce_from->chunkStats);
70 
71  encoder_to->reduceStats(*encoder_from);
72  auto updated_metadata = std::make_shared<ChunkMetadata>();
73  encoder_to->getMetadata(updated_metadata);
74  reduce_to->chunkStats = updated_metadata->chunkStats;
75 }
76 
77 } // namespace
78 
79 ParquetDataWrapper::ParquetDataWrapper(const int db_id, const ForeignTable* foreign_table)
80  : db_id_(db_id)
81  , foreign_table_(foreign_table)
82  , last_fragment_index_(0)
83  , last_fragment_row_count_(0)
84  , total_row_count_(0)
85  , last_row_group_(0)
86  , is_restored_(false)
87  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table)) {
88  auto& server_options = foreign_table->foreign_server->options;
89  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
91  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
92  } else {
93  UNREACHABLE();
94  }
95 }
96 
98  : db_id_(-1), foreign_table_(foreign_table) {}
99 
101  for (const auto& entry : foreign_table->options) {
102  const auto& table_options = foreign_table->supported_options;
103  if (std::find(table_options.begin(), table_options.end(), entry.first) ==
104  table_options.end() &&
105  std::find(supported_options_.begin(), supported_options_.end(), entry.first) ==
106  supported_options_.end()) {
107  throw std::runtime_error{"Invalid foreign table option \"" + entry.first + "\"."};
108  }
109  }
110  ParquetDataWrapper data_wrapper{foreign_table};
111  data_wrapper.validateAndGetCopyParams();
112  data_wrapper.validateFilePath();
113 }
114 
115 std::vector<std::string_view> ParquetDataWrapper::getSupportedOptions() {
116  return std::vector<std::string_view>{supported_options_.begin(),
117  supported_options_.end()};
118 }
119 
121  auto& server_options = foreign_table_->foreign_server->options;
122  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
126  }
127 }
128 
132 
133  last_row_group_ = 0;
136  total_row_count_ = 0;
137 }
138 
139 std::list<const ColumnDescriptor*> ParquetDataWrapper::getColumnsToInitialize(
140  const Interval<ColumnType>& column_interval) {
141  const auto catalog = Catalog_Namespace::Catalog::checkedGet(db_id_);
142  const auto& columns = schema_->getLogicalAndPhysicalColumns();
143  auto column_start = column_interval.start;
144  auto column_end = column_interval.end;
145  std::list<const ColumnDescriptor*> columns_to_init;
146  for (const auto column : columns) {
147  auto column_id = column->columnId;
148  if (column_id >= column_start && column_id <= column_end) {
149  columns_to_init.push_back(column);
150  }
151  }
152  return columns_to_init;
153 }
154 
156  const int fragment_index,
157  const Interval<ColumnType>& column_interval,
158  std::map<ChunkKey, AbstractBuffer*>& required_buffers,
159  const bool reserve_buffers_and_set_stats) {
160  for (const auto column : getColumnsToInitialize(column_interval)) {
161  Chunk_NS::Chunk chunk{column};
162  ChunkKey data_chunk_key;
163  if (column->columnType.is_varlen_indeed()) {
164  data_chunk_key = {
165  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
166  auto data_buffer = required_buffers[data_chunk_key];
167  CHECK(data_buffer);
168  chunk.setBuffer(data_buffer);
169 
170  ChunkKey index_chunk_key{
171  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
172  auto index_buffer = required_buffers[index_chunk_key];
173  CHECK(index_buffer);
174  chunk.setIndexBuffer(index_buffer);
175  } else {
176  data_chunk_key = {
177  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
178  auto data_buffer = required_buffers[data_chunk_key];
179  CHECK(data_buffer);
180  chunk.setBuffer(data_buffer);
181  }
182  chunk.initEncoder();
183  if (reserve_buffers_and_set_stats) {
184  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
185  CHECK(metadata_it != chunk_metadata_map_.end());
186  auto buffer = chunk.getBuffer();
187  auto& metadata = metadata_it->second;
188  auto encoder = buffer->getEncoder();
189  encoder->resetChunkStats(metadata->chunkStats);
190  encoder->setNumElems(metadata->numElements);
191  if (column->columnType.is_string() &&
192  column->columnType.get_compression() == kENCODING_NONE) {
193  auto index_buffer = chunk.getIndexBuf();
194  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
195  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
196  auto index_buffer = chunk.getIndexBuf();
197  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
198  } else {
199  size_t num_bytes_to_reserve =
200  metadata->numElements * column->columnType.get_size();
201  buffer->reserve(num_bytes_to_reserve);
202  }
203  }
204  }
205 }
206 
210 }
211 
212 void ParquetDataWrapper::addNewFragment(int row_group, const std::string& file_path) {
213  const auto last_fragment_entry =
215  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
216 
217  last_fragment_entry->second.back().end_index = last_row_group_;
221  RowGroupInterval{file_path, row_group});
222 }
223 
224 bool ParquetDataWrapper::isNewFile(const std::string& file_path) const {
225  const auto last_fragment_entry =
227  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
228 
229  // The entry for the first fragment starts out as an empty vector
230  if (last_fragment_entry->second.empty()) {
232  return true;
233  } else {
234  return (last_fragment_entry->second.back().file_path != file_path);
235  }
236 }
237 
238 void ParquetDataWrapper::addNewFile(const std::string& file_path) {
239  const auto last_fragment_entry =
241  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
242 
243  // The entry for the first fragment starts out as an empty vector
244  if (last_fragment_entry->second.empty()) {
246  } else {
247  last_fragment_entry->second.back().end_index = last_row_group_;
248  }
249  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
250 }
251 
254  std::set<std::string> new_file_paths;
255  auto processed_file_paths = getProcessedFilePaths();
256  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
257  auto all_file_paths = getAllFilePaths();
258  for (const auto& file_path : processed_file_paths) {
259  if (all_file_paths.find(file_path) == all_file_paths.end()) {
260  throw_removed_file_error(file_path);
261  }
262  }
263 
264  for (const auto& file_path : all_file_paths) {
265  if (processed_file_paths.find(file_path) == processed_file_paths.end()) {
266  new_file_paths.emplace(file_path);
267  }
268  }
269 
270  // Single file append
271  if (new_file_paths.empty() && all_file_paths.size() == 1) {
272  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
273  const auto& file_path = *all_file_paths.begin();
274  CHECK_EQ(*processed_file_paths.begin(), file_path);
275 
276  std::unique_ptr<parquet::arrow::FileReader> reader;
277  open_parquet_table(file_path, reader, file_system_);
278  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
279 
280  if (row_count < total_row_count_) {
281  throw_removed_row_error(file_path);
282  } else if (row_count > total_row_count_) {
283  new_file_paths = all_file_paths;
284  chunk_metadata_map_.clear();
286  }
287  }
288  } else {
289  new_file_paths = getAllFilePaths();
290  chunk_metadata_map_.clear();
292  }
293 
294  if (!new_file_paths.empty()) {
295  metadataScanFiles(new_file_paths);
296  }
297 }
298 
300  auto& server_options = foreign_table_->foreign_server->options;
301  std::string base_path;
302  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
304  auto base_path_entry = server_options.find(ForeignServer::BASE_PATH_KEY);
305  if (base_path_entry == server_options.end()) {
306  throw std::runtime_error{"No base path found in foreign server options."};
307  }
308  base_path = base_path_entry->second;
309  } else {
310  UNREACHABLE();
311  }
312 
313  auto file_path_entry = foreign_table_->options.find("FILE_PATH");
314  std::string file_path{};
315  if (file_path_entry != foreign_table_->options.end()) {
316  file_path = file_path_entry->second;
317  }
318  const std::string separator{boost::filesystem::path::preferred_separator};
319  return std::regex_replace(
320  base_path + separator + file_path, std::regex{separator + "{2,}"}, separator);
321 }
322 
324  std::set<std::string> file_paths;
325  for (const auto& entry : fragment_to_row_group_interval_map_) {
326  for (const auto& row_group_interval : entry.second) {
327  file_paths.emplace(row_group_interval.file_path);
328  }
329  }
330  return file_paths;
331 }
332 
333 std::set<std::string> ParquetDataWrapper::getAllFilePaths() {
334  auto timer = DEBUG_TIMER(__func__);
335  std::set<std::string> file_paths;
336  arrow::fs::FileSelector file_selector{};
337  std::string base_path = getConfiguredFilePath();
338  file_selector.base_dir = base_path;
339  file_selector.recursive = true;
340 
341  auto file_info_result = file_system_->GetFileInfo(file_selector);
342  if (!file_info_result.ok()) {
343  // This is expected when `base_path` points to a single file.
344  file_paths.emplace(base_path);
345  } else {
346  auto& file_info_vector = file_info_result.ValueOrDie();
347  for (const auto& file_info : file_info_vector) {
348  if (file_info.type() == arrow::fs::FileType::File) {
349  file_paths.emplace(file_info.path());
350  }
351  }
352  if (file_paths.empty()) {
353  throw std::runtime_error{"No file found at given path \"" + base_path + "\"."};
354  }
355  }
356  return file_paths;
357 }
358 
360  import_export::CopyParams copy_params{};
361  // The file_type argument is never utilized in the context of FSI,
362  // for completeness, set the file_type
363  copy_params.file_type = import_export::FileType::PARQUET;
364  return copy_params;
365 }
366 
368  const std::string& option_name,
369  const size_t expected_num_chars) const {
370  if (auto it = foreign_table_->options.find(option_name);
371  it != foreign_table_->options.end()) {
372  if (it->second.length() != expected_num_chars) {
373  throw std::runtime_error{"Value of \"" + option_name +
374  "\" foreign table option has the wrong number of "
375  "characters. Expected " +
376  std::to_string(expected_num_chars) + " character(s)."};
377  }
378  return it->second;
379  }
380  return "";
381 }
382 
383 void ParquetDataWrapper::metadataScanFiles(const std::set<std::string>& file_paths) {
384  LazyParquetChunkLoader chunk_loader(file_system_);
385  auto row_group_metadata = chunk_loader.metadataScan(file_paths, *schema_);
386  auto column_interval =
387  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
388  schema_->getLogicalAndPhysicalColumns().back()->columnId};
389 
390  for (const auto& row_group_metadata_item : row_group_metadata) {
391  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
392  CHECK(static_cast<int>(column_chunk_metadata.size()) ==
393  schema_->numLogicalAndPhysicalColumns());
394  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
395  const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
396  int row_group = row_group_metadata_item.row_group_index;
397  const auto& file_path = row_group_metadata_item.file_path;
398  if (moveToNextFragment(import_row_count)) {
399  addNewFragment(row_group, file_path);
400  } else if (isNewFile(file_path)) {
401  CHECK_EQ(row_group, 0);
402  addNewFile(file_path);
403  }
404  last_row_group_ = row_group;
405 
406  for (int column_id = column_interval.start; column_id <= column_interval.end;
407  column_id++, column_chunk_metadata_iter++) {
408  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
409  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
410 
411  const auto& type_info = column_descriptor->columnType;
412  ChunkKey chunk_key{
414  ChunkKey data_chunk_key = chunk_key;
415  if (type_info.is_varlen_indeed()) {
416  data_chunk_key.emplace_back(1);
417  }
418  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
419  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
420  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
421  } else {
422  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
423  }
424  }
425  last_fragment_row_count_ += import_row_count;
426  total_row_count_ += import_row_count;
427  }
429 }
430 
431 bool ParquetDataWrapper::moveToNextFragment(size_t new_rows_count) const {
432  return (last_fragment_row_count_ + new_rows_count) >
433  static_cast<size_t>(foreign_table_->maxFragRows);
434 }
435 
437  ChunkMetadataVector& chunk_metadata_vector) {
439  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
440  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
441  }
442 }
443 
445  const int logical_column_id,
446  const int fragment_id,
447  std::map<ChunkKey, AbstractBuffer*>& required_buffers) {
449  const ColumnDescriptor* logical_column =
450  schema_->getColumnDescriptor(logical_column_id);
451  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
452 
453  const Interval<ColumnType> column_interval = {
454  logical_column_id,
455  logical_column_id + logical_column->columnType.get_physical_cols()};
456  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
457 
458  const auto& row_group_intervals = fragment_to_row_group_interval_map_[fragment_id];
459 
460  const bool is_dictionary_encoded_string_column =
461  logical_column->columnType.is_dict_encoded_string() ||
462  (logical_column->columnType.is_array() &&
463  logical_column->columnType.get_elem_type().is_dict_encoded_string());
464 
465  StringDictionary* string_dictionary = nullptr;
466  if (is_dictionary_encoded_string_column) {
467  auto dict_descriptor = catalog->getMetadataForDictUnlocked(
468  logical_column->columnType.get_comp_param(), true);
469  CHECK(dict_descriptor);
470  string_dictionary = dict_descriptor->stringDict.get();
471  }
472 
473  std::list<Chunk_NS::Chunk> chunks;
474  for (int column_id = column_interval.start; column_id <= column_interval.end;
475  ++column_id) {
476  auto column_descriptor = schema_->getColumnDescriptor(column_id);
477  Chunk_NS::Chunk chunk{column_descriptor};
478  if (column_descriptor->columnType.is_varlen_indeed()) {
479  ChunkKey data_chunk_key = {
480  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
481  auto buffer = required_buffers[data_chunk_key];
482  CHECK(buffer);
483  chunk.setBuffer(buffer);
484  ChunkKey index_chunk_key = {
485  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
486  auto index_buffer = required_buffers[index_chunk_key];
487  CHECK(index_buffer);
488  chunk.setIndexBuffer(index_buffer);
489  } else {
490  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
491  auto buffer = required_buffers[chunk_key];
492  CHECK(buffer);
493  chunk.setBuffer(buffer);
494  }
495  chunks.emplace_back(chunk);
496  }
497 
498  LazyParquetChunkLoader chunk_loader(file_system_);
499  auto metadata = chunk_loader.loadChunk(
500  row_group_intervals, parquet_column_index, chunks, string_dictionary);
501  auto fragmenter = foreign_table_->fragmenter;
502  if (fragmenter) {
503  auto metadata_iter = metadata.begin();
504  for (int column_id = column_interval.start; column_id <= column_interval.end;
505  ++column_id, ++metadata_iter) {
506  auto column = schema_->getColumnDescriptor(column_id);
507  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
508  if (column->columnType.is_varlen_indeed()) {
509  data_chunk_key.emplace_back(1);
510  }
511  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
512  auto cached_metadata = chunk_metadata_map_[data_chunk_key];
513  auto updated_metadata = std::make_shared<ChunkMetadata>();
514  *updated_metadata = *cached_metadata;
515  // for certain types, update the metadata statistics
516  if (is_dictionary_encoded_string_column ||
517  logical_column->columnType.is_geometry()) {
518  CHECK(metadata_iter != metadata.end());
519  auto& chunk_metadata_ptr = *metadata_iter;
520  updated_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
521  updated_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
522  }
523  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
524  updated_metadata->numBytes = required_buffers[data_chunk_key]->size();
525  fragmenter->updateColumnChunkMetadata(column, fragment_id, updated_metadata);
526  }
527  }
528 }
529 
531  std::map<ChunkKey, AbstractBuffer*>& required_buffers,
532  std::map<ChunkKey, AbstractBuffer*>& optional_buffers) {
533  CHECK(!required_buffers.empty());
534  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
535 
536  std::set<int> logical_column_ids;
537  for (const auto& [chunk_key, buffer] : required_buffers) {
538  CHECK_EQ(fragment_id, chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
539  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
540  const auto column_id =
541  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId;
542  logical_column_ids.emplace(column_id);
543  }
544 
545  for (const auto column_id : logical_column_ids) {
546  loadBuffersUsingLazyParquetChunkLoader(column_id, fragment_id, required_buffers);
547  }
548 }
549 
550 void set_value(rapidjson::Value& json_val,
551  const RowGroupInterval& value,
552  rapidjson::Document::AllocatorType& allocator) {
553  json_val.SetObject();
554  json_utils::add_value_to_object(json_val, value.file_path, "file_path", allocator);
555  json_utils::add_value_to_object(json_val, value.start_index, "start_index", allocator);
556  json_utils::add_value_to_object(json_val, value.end_index, "end_index", allocator);
557 }
558 
559 void get_value(const rapidjson::Value& json_val, RowGroupInterval& value) {
560  CHECK(json_val.IsObject());
561  json_utils::get_value_from_object(json_val, value.file_path, "file_path");
562  json_utils::get_value_from_object(json_val, value.start_index, "start_index");
563  json_utils::get_value_from_object(json_val, value.end_index, "end_index");
564 }
565 
567  const std::string& file_path) const {
568  rapidjson::Document d;
569  d.SetObject();
570 
573  "fragment_to_row_group_interval_map",
574  d.GetAllocator());
575  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
577  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
579  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
581  d, total_row_count_, "total_row_count", d.GetAllocator());
582 
583  json_utils::write_to_file(d, file_path);
584 }
585 
587  const std::string& file_path,
588  const ChunkMetadataVector& chunk_metadata_vector) {
589  auto d = json_utils::read_from_file(file_path);
590  CHECK(d.IsObject());
591 
593  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
595  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
597  d, last_fragment_row_count_, "last_fragment_row_count");
599 
600  CHECK(chunk_metadata_map_.empty());
601  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
602  chunk_metadata_map_[chunk_key] = chunk_metadata;
603  }
604  is_restored_ = true;
605 }
606 
608  return is_restored_;
609 }
610 
611 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, std::map< ChunkKey, AbstractBuffer * > &required_buffers)
void populateChunkBuffers(std::map< ChunkKey, AbstractBuffer * > &required_buffers, std::map< ChunkKey, AbstractBuffer * > &optional_buffers) override
std::vector< int > ChunkKey
Definition: types.h:37
std::set< std::string > getProcessedFilePaths()
std::set< std::string > getAllFilePaths()
static const std::set< const char * > supported_options
Definition: ForeignTable.h:65
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
static std::vector< std::string_view > getSupportedOptions()
std::unique_ptr< ForeignTableSchema > schema_
void serializeDataWrapperInternals(const std::string &file_path) const override
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
#define UNREACHABLE()
Definition: Logger.h:241
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
ParquetDataWrapper(const int db_id, const ForeignTable *foreign_table)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
import_export::CopyParams validateAndGetCopyParams() const
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::string to_string(char const *&&v)
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:223
void throw_removed_row_error(const std::string &file_path)
int32_t StringOffsetT
Definition: sqltypes.h:947
std::pair< typename std::map< ChunkKey, T >::iterator, typename std::map< ChunkKey, T >::iterator > prefix_range(std::map< ChunkKey, T > &map, const ChunkKey &chunk_key_prefix)
void throw_removed_file_error(const std::string &file_path)
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
int get_physical_cols() const
Definition: sqltypes.h:351
rapidjson::Document read_from_file(const std::string &file_path)
static constexpr std::array< char const *, 0 > supported_options_
void metadataScanFiles(const std::set< std::string > &file_paths)
void addNewFragment(int row_group, const std::string &file_path)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3846
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:613
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
int32_t ArrayOffsetT
Definition: sqltypes.h:948
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:339
const ForeignServer * foreign_server
Definition: ForeignTable.h:63
bool g_enable_watchdog false
Definition: Execute.cpp:73
static constexpr std::string_view BASE_PATH_KEY
Definition: ForeignServer.h:44
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, std::map< ChunkKey, AbstractBuffer * > &required_buffers, const bool reserve_buffers_and_set_stats=false)
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:499
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void open_parquet_table(const std::string &file_path, std::unique_ptr< parquet::arrow::FileReader > &reader, std::shared_ptr< arrow::fs::FileSystem > &file_system)
static void validateOptions(const ForeignTable *foreign_table)
bool is_dict_encoded_string() const
Definition: sqltypes.h:518
SQLTypeInfo columnType
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:703
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
bool is_array() const
Definition: sqltypes.h:495
std::list< RowGroupMetadata > metadataScan(const std::set< std::string > &file_paths, const ForeignTableSchema &schema)
Perform a metadata scan for the paths specified.
std::string validateAndGetStringWithLength(const std::string &option_name, const size_t expected_num_chars) const