OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ParquetDataWrapper.h"
18 
19 #include <queue>
20 
21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
23 
24 #include "Catalog/Catalog.h"
26 #include "FsiChunkUtils.h"
27 #include "FsiJsonUtils.h"
29 #include "LazyParquetChunkLoader.h"
30 #include "ParquetShared.h"
31 #include "Shared/SysDefinitions.h"
32 #include "Shared/file_path_util.h"
33 #include "Shared/misc.h"
34 #include "Utils/DdlUtils.h"
35 
36 namespace foreign_storage {
37 
38 namespace {
39 void reduce_metadata(std::shared_ptr<ChunkMetadata> reduce_to,
40  std::shared_ptr<ChunkMetadata> reduce_from) {
41  CHECK(reduce_to->sqlType == reduce_from->sqlType);
42  reduce_to->numBytes += reduce_from->numBytes;
43  reduce_to->numElements += reduce_from->numElements;
44  reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
45 
46  auto column_type = reduce_to->sqlType;
47  column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
48 
49  // metadata reducution is done at metadata scan time, both string & geometry
50  // columns have no valid stats to reduce beyond `has_nulls`
51  if (column_type.is_string() || column_type.is_geometry()) {
52  // Reset to invalid range, as formerly valid metadata
53  // needs to be invalidated during an append for these types
54  reduce_to->chunkStats.max = reduce_from->chunkStats.max;
55  reduce_to->chunkStats.min = reduce_from->chunkStats.min;
56  return;
57  }
58 
59  ForeignStorageBuffer buffer_to;
60  buffer_to.initEncoder(column_type);
61  auto encoder_to = buffer_to.getEncoder();
62  encoder_to->resetChunkStats(reduce_to->chunkStats);
63 
64  ForeignStorageBuffer buffer_from;
65  buffer_from.initEncoder(column_type);
66  auto encoder_from = buffer_from.getEncoder();
67  encoder_from->resetChunkStats(reduce_from->chunkStats);
68 
69  encoder_to->reduceStats(*encoder_from);
70  auto updated_metadata = std::make_shared<ChunkMetadata>();
71  encoder_to->getMetadata(updated_metadata);
72  reduce_to->chunkStats = updated_metadata->chunkStats;
73 }
74 } // namespace
75 
77  : do_metadata_stats_validation_(true), db_id_(-1), foreign_table_(nullptr) {}
78 
80  std::shared_ptr<arrow::fs::FileSystem> file_system)
81  : do_metadata_stats_validation_(false)
82  , db_id_(-1)
83  , foreign_table_(foreign_table)
84  , last_fragment_index_(0)
85  , last_fragment_row_count_(0)
86  , total_row_count_(0)
87  , last_row_group_(0)
88  , is_restored_(false)
89  , file_system_(file_system)
90  , file_reader_cache_(std::make_unique<FileReaderMap>()) {}
91 
93  const ForeignTable* foreign_table,
94  const bool do_metadata_stats_validation)
95  : do_metadata_stats_validation_(do_metadata_stats_validation)
96  , db_id_(db_id)
97  , foreign_table_(foreign_table)
98  , last_fragment_index_(0)
99  , last_fragment_row_count_(0)
100  , total_row_count_(0)
101  , last_row_group_(0)
102  , is_restored_(false)
103  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
104  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
105  auto& server_options = foreign_table->foreign_server->options;
106  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
107  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
108  } else {
109  UNREACHABLE();
110  }
111 }
112 
116 
117  last_row_group_ = 0;
120  total_row_count_ = 0;
121  file_reader_cache_->clear();
122 }
123 
124 std::list<const ColumnDescriptor*> ParquetDataWrapper::getColumnsToInitialize(
125  const Interval<ColumnType>& column_interval) {
127  CHECK(catalog);
128  const auto& columns = schema_->getLogicalAndPhysicalColumns();
129  auto column_start = column_interval.start;
130  auto column_end = column_interval.end;
131  std::list<const ColumnDescriptor*> columns_to_init;
132  for (const auto column : columns) {
133  auto column_id = column->columnId;
134  if (column_id >= column_start && column_id <= column_end) {
135  columns_to_init.push_back(column);
136  }
137  }
138  return columns_to_init;
139 }
140 
142  const int fragment_index,
143  const Interval<ColumnType>& column_interval,
144  const ChunkToBufferMap& required_buffers,
145  const bool reserve_buffers_and_set_stats) {
146  for (const auto column : getColumnsToInitialize(column_interval)) {
147  Chunk_NS::Chunk chunk{column, false};
148  ChunkKey data_chunk_key;
149  if (column->columnType.is_varlen_indeed()) {
150  data_chunk_key = {
151  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
152  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
153  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
154  chunk.setBuffer(data_buffer);
155 
156  ChunkKey index_chunk_key{
157  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
158  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
159  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
160  chunk.setIndexBuffer(index_buffer);
161  } else {
162  data_chunk_key = {
163  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
164  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
165  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
166  chunk.setBuffer(data_buffer);
167  }
168  chunk.initEncoder();
169  if (reserve_buffers_and_set_stats) {
170  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
171  CHECK(metadata_it != chunk_metadata_map_.end());
172  auto buffer = chunk.getBuffer();
173  auto& metadata = metadata_it->second;
174  auto encoder = buffer->getEncoder();
175  encoder->resetChunkStats(metadata->chunkStats);
176  encoder->setNumElems(metadata->numElements);
177  if ((column->columnType.is_string() &&
178  column->columnType.get_compression() == kENCODING_NONE) ||
179  column->columnType.is_geometry()) {
180  // non-dictionary string or geometry WKT string
181  auto index_buffer = chunk.getIndexBuf();
182  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
183  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
184  auto index_buffer = chunk.getIndexBuf();
185  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
186  } else {
187  size_t num_bytes_to_reserve =
188  metadata->numElements * column->columnType.get_size();
189  buffer->reserve(num_bytes_to_reserve);
190  }
191  }
192  }
193 }
194 
198 }
199 
200 void ParquetDataWrapper::addNewFragment(int row_group, const std::string& file_path) {
201  const auto last_fragment_entry =
203  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
204 
205  last_fragment_entry->second.back().end_index = last_row_group_;
209  RowGroupInterval{file_path, row_group});
210 }
211 
212 bool ParquetDataWrapper::isNewFile(const std::string& file_path) const {
213  const auto last_fragment_entry =
215  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
216 
217  // The entry for the first fragment starts out as an empty vector
218  if (last_fragment_entry->second.empty()) {
220  return true;
221  } else {
222  return (last_fragment_entry->second.back().file_path != file_path);
223  }
224 }
225 
226 void ParquetDataWrapper::addNewFile(const std::string& file_path) {
227  const auto last_fragment_entry =
229  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
230 
231  // The entry for the first fragment starts out as an empty vector
232  if (last_fragment_entry->second.empty()) {
234  } else {
235  last_fragment_entry->second.back().end_index = last_row_group_;
236  }
237  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
238 }
239 
242  CHECK(catalog);
243  std::vector<std::string> new_file_paths;
244  auto processed_file_paths = getProcessedFilePaths();
245  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
246  auto all_file_paths = getAllFilePaths();
247  for (const auto& file_path : processed_file_paths) {
248  if (!shared::contains(all_file_paths, file_path)) {
249  throw_removed_file_error(file_path);
250  }
251  }
252 
253  for (const auto& file_path : all_file_paths) {
254  if (!shared::contains(processed_file_paths, file_path)) {
255  new_file_paths.emplace_back(file_path);
256  }
257  }
258 
259  // Single file append
260  // If an append occurs with multiple files, then we assume any existing files have
261  // not been altered. If an append occurs on a single file, then we check to see if
262  // it has changed.
263  if (new_file_paths.empty() && all_file_paths.size() == 1) {
264  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
265  const auto& file_path = *all_file_paths.begin();
266  CHECK_EQ(*processed_file_paths.begin(), file_path);
267 
268  // Since an existing file is being appended to we need to update the cached
269  // FileReader as the existing one will be out of date.
270  auto reader = file_reader_cache_->insert(file_path, file_system_);
271  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
272 
273  if (row_count < total_row_count_) {
275  } else if (row_count > total_row_count_) {
276  new_file_paths = all_file_paths;
277  chunk_metadata_map_.clear();
279  }
280  }
281  } else {
282  CHECK(chunk_metadata_map_.empty());
283  new_file_paths = getAllFilePaths();
285  }
286 
287  if (!new_file_paths.empty()) {
288  metadataScanFiles(new_file_paths);
289  }
290 }
291 
293  std::set<std::string> file_paths;
294  for (const auto& entry : fragment_to_row_group_interval_map_) {
295  for (const auto& row_group_interval : entry.second) {
296  file_paths.emplace(row_group_interval.file_path);
297  }
298  }
299  return file_paths;
300 }
301 
302 std::vector<std::string> ParquetDataWrapper::getAllFilePaths() {
303  auto timer = DEBUG_TIMER(__func__);
304  std::vector<std::string> found_file_paths;
305  auto file_path = getFullFilePath(foreign_table_);
306  const auto& regex_pattern = foreign_table_->getOption(REGEX_PATH_FILTER_KEY);
307  const auto& sort_by = foreign_table_->getOption(FILE_SORT_ORDER_BY_KEY);
308  const auto& sort_regex = foreign_table_->getOption(FILE_SORT_REGEX_KEY);
309 
310  auto& server_options = foreign_table_->foreign_server->options;
311  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
312  found_file_paths = shared::local_glob_filter_sort_files(
313  file_path, regex_pattern, sort_by, sort_regex);
314  } else {
315  UNREACHABLE();
316  }
317  return found_file_paths;
318 }
319 
320 void ParquetDataWrapper::metadataScanFiles(const std::vector<std::string>& file_paths) {
321  LazyParquetChunkLoader chunk_loader(file_system_, file_reader_cache_.get(), nullptr);
322  auto row_group_metadata =
323  chunk_loader.metadataScan(file_paths, *schema_, do_metadata_stats_validation_);
324  auto column_interval =
325  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
326  schema_->getLogicalAndPhysicalColumns().back()->columnId};
327 
328  for (const auto& row_group_metadata_item : row_group_metadata) {
329  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
330  CHECK(static_cast<int>(column_chunk_metadata.size()) ==
331  schema_->numLogicalAndPhysicalColumns());
332  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
333  const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
334  int row_group = row_group_metadata_item.row_group_index;
335  const auto& file_path = row_group_metadata_item.file_path;
336  if (moveToNextFragment(import_row_count)) {
337  addNewFragment(row_group, file_path);
338  } else if (isNewFile(file_path)) {
339  CHECK_EQ(row_group, 0);
340  addNewFile(file_path);
341  }
342  last_row_group_ = row_group;
343 
344  for (int column_id = column_interval.start; column_id <= column_interval.end;
345  column_id++, column_chunk_metadata_iter++) {
346  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
347  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
348 
349  const auto& type_info = column_descriptor->columnType;
350  ChunkKey chunk_key{
352  ChunkKey data_chunk_key = chunk_key;
353  if (type_info.is_varlen_indeed()) {
354  data_chunk_key.emplace_back(1);
355  }
356  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
357  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
358  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
359  } else {
360  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
361  }
362  }
363  last_fragment_row_count_ += import_row_count;
364  total_row_count_ += import_row_count;
365  }
367 }
368 
369 bool ParquetDataWrapper::moveToNextFragment(size_t new_rows_count) const {
370  return (last_fragment_row_count_ + new_rows_count) >
371  static_cast<size_t>(foreign_table_->maxFragRows);
372 }
373 
375  ChunkMetadataVector& chunk_metadata_vector) {
377  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
378  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
379  }
380 }
381 
383  const int logical_column_id,
384  const int fragment_id,
385  const ChunkToBufferMap& required_buffers,
386  AbstractBuffer* delete_buffer) {
388  CHECK(catalog);
389  const ColumnDescriptor* logical_column =
390  schema_->getColumnDescriptor(logical_column_id);
391  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
392 
393  const Interval<ColumnType> column_interval = {
394  logical_column_id,
395  logical_column_id + logical_column->columnType.get_physical_cols()};
396  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
397 
398  const auto& row_group_intervals =
400 
401  const bool is_dictionary_encoded_string_column =
402  logical_column->columnType.is_dict_encoded_string() ||
403  (logical_column->columnType.is_array() &&
404  logical_column->columnType.get_elem_type().is_dict_encoded_string());
405 
406  StringDictionary* string_dictionary = nullptr;
407  if (is_dictionary_encoded_string_column) {
408  auto dict_descriptor =
409  catalog->getMetadataForDict(logical_column->columnType.get_comp_param(), true);
410  CHECK(dict_descriptor);
411  string_dictionary = dict_descriptor->stringDict.get();
412  }
413 
414  std::list<Chunk_NS::Chunk> chunks;
415  for (int column_id = column_interval.start; column_id <= column_interval.end;
416  ++column_id) {
417  auto column_descriptor = schema_->getColumnDescriptor(column_id);
418  Chunk_NS::Chunk chunk{column_descriptor, false};
419  if (column_descriptor->columnType.is_varlen_indeed()) {
420  ChunkKey data_chunk_key = {
421  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
422  auto buffer = shared::get_from_map(required_buffers, data_chunk_key);
423  chunk.setBuffer(buffer);
424  ChunkKey index_chunk_key = {
425  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
426  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
427  chunk.setIndexBuffer(index_buffer);
428  } else {
429  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
430  auto buffer = shared::get_from_map(required_buffers, chunk_key);
431  chunk.setBuffer(buffer);
432  }
433  chunks.emplace_back(chunk);
434  }
435 
436  std::unique_ptr<RejectedRowIndices> rejected_row_indices;
437  if (delete_buffer) {
438  rejected_row_indices = std::make_unique<RejectedRowIndices>();
439  }
440  LazyParquetChunkLoader chunk_loader(
442  auto metadata = chunk_loader.loadChunk(row_group_intervals,
443  parquet_column_index,
444  chunks,
445  string_dictionary,
446  rejected_row_indices.get());
447 
448  if (delete_buffer) {
449  // all modifying operations on `delete_buffer` must be synchronized as it is a
450  // shared buffer
451  std::unique_lock<std::mutex> delete_buffer_lock(delete_buffer_mutex_);
452 
453  CHECK(!chunks.empty());
454  CHECK(chunks.begin()->getBuffer()->hasEncoder());
455  auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
456 
457  // ensure delete buffer is sized appropriately
458  if (delete_buffer->size() < num_rows_in_chunk) {
459  auto remaining_rows = num_rows_in_chunk - delete_buffer->size();
460  std::vector<int8_t> data(remaining_rows, false);
461  delete_buffer->append(data.data(), remaining_rows);
462  }
463 
464  // compute a logical OR with current `delete_buffer` contents and this chunks
465  // rejected indices
466  CHECK(rejected_row_indices);
467  auto delete_buffer_data = delete_buffer->getMemoryPtr();
468  for (const auto& rejected_index : *rejected_row_indices) {
469  CHECK_GT(delete_buffer->size(), static_cast<size_t>(rejected_index));
470  delete_buffer_data[rejected_index] = true;
471  }
472  }
473 
474  auto metadata_iter = metadata.begin();
475  for (int column_id = column_interval.start; column_id <= column_interval.end;
476  ++column_id, ++metadata_iter) {
477  auto column = schema_->getColumnDescriptor(column_id);
478  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
479  if (column->columnType.is_varlen_indeed()) {
480  data_chunk_key.emplace_back(1);
481  }
482  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
483 
484  // Allocate new shared_ptr for metadata so we dont modify old one which may be used
485  // by executor
486  auto cached_metadata_previous =
487  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
488  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
489  std::make_shared<ChunkMetadata>();
490  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
491  *cached_metadata = *cached_metadata_previous;
492 
493  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
494  cached_metadata->numBytes =
495  shared::get_from_map(required_buffers, data_chunk_key)->size();
496 
497  // for certain types, update the metadata statistics
498  // should update the cache, and the internal chunk_metadata_map_
499  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
500  CHECK(metadata_iter != metadata.end());
501  auto& chunk_metadata_ptr = *metadata_iter;
502  cached_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
503  cached_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
504 
505  // Update stats on buffer so it is saved in cache
506  shared::get_from_map(required_buffers, data_chunk_key)
507  ->getEncoder()
508  ->resetChunkStats(cached_metadata->chunkStats);
509  }
510  }
511 }
512 
514  const ChunkToBufferMap& optional_buffers,
515  AbstractBuffer* delete_buffer) {
516  ChunkToBufferMap buffers_to_load;
517  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
518  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
519 
520  CHECK(!buffers_to_load.empty());
521 
522  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
523  for (const auto& [chunk_key, buffer] : buffers_to_load) {
524  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
525  col_frag_hints.emplace(
526  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
527  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
528  }
529 
530  std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
531  [&, this](const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
532  for (const auto& [col_id, frag_id] : hint_set) {
534  col_id, frag_id, buffers_to_load, delete_buffer);
535  }
536  };
537  auto futures = create_futures_for_workers(col_frag_hints, g_max_import_threads, lambda);
538 
539  // We wait on all futures, then call get because we want all threads to have finished
540  // before we propagate a potential exception.
541  for (auto& future : futures) {
542  future.wait();
543  }
544 
545  for (auto& future : futures) {
546  future.get();
547  }
548 }
549 
550 void set_value(rapidjson::Value& json_val,
551  const RowGroupInterval& value,
552  rapidjson::Document::AllocatorType& allocator) {
553  json_val.SetObject();
554  json_utils::add_value_to_object(json_val, value.file_path, "file_path", allocator);
555  json_utils::add_value_to_object(json_val, value.start_index, "start_index", allocator);
556  json_utils::add_value_to_object(json_val, value.end_index, "end_index", allocator);
557 }
558 
559 void get_value(const rapidjson::Value& json_val, RowGroupInterval& value) {
560  CHECK(json_val.IsObject());
561  json_utils::get_value_from_object(json_val, value.file_path, "file_path");
562  json_utils::get_value_from_object(json_val, value.start_index, "start_index");
563  json_utils::get_value_from_object(json_val, value.end_index, "end_index");
564 }
565 
567  rapidjson::Document d;
568  d.SetObject();
569 
572  "fragment_to_row_group_interval_map",
573  d.GetAllocator());
574  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
576  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
578  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
580  d, total_row_count_, "total_row_count", d.GetAllocator());
581  return json_utils::write_to_string(d);
582 }
583 
585  const std::string& file_path,
586  const ChunkMetadataVector& chunk_metadata_vector) {
587  auto d = json_utils::read_from_file(file_path);
588  CHECK(d.IsObject());
589 
591  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
593  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
595  d, last_fragment_row_count_, "last_fragment_row_count");
597 
598  CHECK(chunk_metadata_map_.empty());
599  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
600  chunk_metadata_map_[chunk_key] = chunk_metadata;
601  }
602  is_restored_ = true;
603 }
604 
606  return is_restored_;
607 }
608 
610  LazyParquetChunkLoader chunk_loader(
612  auto file_paths = getAllFilePaths();
613  if (file_paths.empty()) {
614  throw ForeignStorageException{"No file found at \"" +
616  }
617  return chunk_loader.previewFiles(file_paths, num_rows);
618 }
619 
620 // declared in three derived classes to avoid
621 // polluting ForeignDataWrapper virtual base
622 // @TODO refactor to lower class if needed
624  // must have these
625  CHECK_GE(db_id_, 0);
627 
628  // populate map for all poly columns in this table
630  CHECK(catalog);
631  auto columns =
632  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
633  for (auto const& column : columns) {
634  if (IS_GEO_POLY(column->columnType.get_type())) {
636  .try_emplace(column->columnId,
637  std::make_unique<import_export::RenderGroupAnalyzer>())
638  .second);
639  }
640  }
641 }
642 
643 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:196
std::string getSerializedDataWrapper() const override
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::vector< int > ChunkKey
Definition: types.h:37
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
virtual int8_t * getMemoryPtr()=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
#define UNREACHABLE()
Definition: Logger.h:267
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define CHECK_GE(x, y)
Definition: Logger.h:236
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
Definition: CsvShared.cpp:44
#define CHECK_GT(x, y)
Definition: Logger.h:235
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:274
int32_t StringOffsetT
Definition: sqltypes.h:1113
void throw_removed_file_error(const std::string &file_path)
This file contains the class specification and related data structures for Catalog.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
DataPreview getDataPreview(const size_t num_rows)
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
int get_physical_cols() const
Definition: sqltypes.h:360
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
static SysCatalog & instance()
Definition: SysCatalog.h:337
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:75
RenderGroupAnalyzerMap render_group_analyzer_map_
rapidjson::Document read_from_file(const std::string &file_path)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::optional< std::string > getOption(const std::string_view &key) const
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
void metadataScanFiles(const std::vector< std::string > &file_paths)
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex, const bool recurse)
int32_t ArrayOffsetT
Definition: sqltypes.h:1114
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:223
bool is_geometry() const
Definition: sqltypes.h:522
#define DEBUG_TIMER(name)
Definition: Logger.h:370
std::string write_to_string(const rapidjson::Document &document)
bool is_dict_encoded_string() const
Definition: sqltypes.h:548
SQLTypeInfo columnType
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:865
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
Definition: CsvShared.cpp:26
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
size_t g_max_import_threads
Definition: Importer.cpp:106
bool is_array() const
Definition: sqltypes.h:518
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255