OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ParquetDataWrapper.h"
18 
19 #include <queue>
20 
21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
23 
24 #include "Catalog/Catalog.h"
26 #include "FsiChunkUtils.h"
27 #include "FsiJsonUtils.h"
29 #include "LazyParquetChunkLoader.h"
30 #include "ParquetShared.h"
31 #include "Shared/SysDefinitions.h"
32 #include "Shared/file_path_util.h"
33 #include "Shared/misc.h"
34 #include "Utils/DdlUtils.h"
35 
36 namespace foreign_storage {
37 
38 namespace {
39 void reduce_metadata(std::shared_ptr<ChunkMetadata> reduce_to,
40  std::shared_ptr<ChunkMetadata> reduce_from) {
41  CHECK(reduce_to->sqlType == reduce_from->sqlType);
42  reduce_to->numBytes += reduce_from->numBytes;
43  reduce_to->numElements += reduce_from->numElements;
44  reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
45 
46  auto column_type = reduce_to->sqlType;
47  column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
48 
49  // metadata reducution is done at metadata scan time, both string & geometry
50  // columns have no valid stats to reduce beyond `has_nulls`
51  if (column_type.is_string() || column_type.is_geometry()) {
52  // Reset to invalid range, as formerly valid metadata
53  // needs to be invalidated during an append for these types
54  reduce_to->chunkStats.max = reduce_from->chunkStats.max;
55  reduce_to->chunkStats.min = reduce_from->chunkStats.min;
56  return;
57  }
58 
59  ForeignStorageBuffer buffer_to;
60  buffer_to.initEncoder(column_type);
61  auto encoder_to = buffer_to.getEncoder();
62  encoder_to->resetChunkStats(reduce_to->chunkStats);
63 
64  ForeignStorageBuffer buffer_from;
65  buffer_from.initEncoder(column_type);
66  auto encoder_from = buffer_from.getEncoder();
67  encoder_from->resetChunkStats(reduce_from->chunkStats);
68 
69  encoder_to->reduceStats(*encoder_from);
70  auto updated_metadata = std::make_shared<ChunkMetadata>();
71  encoder_to->getMetadata(updated_metadata);
72  reduce_to->chunkStats = updated_metadata->chunkStats;
73 }
74 } // namespace
75 
77  : do_metadata_stats_validation_(true), db_id_(-1), foreign_table_(nullptr) {}
78 
80  std::shared_ptr<arrow::fs::FileSystem> file_system)
81  : do_metadata_stats_validation_(false)
82  , db_id_(-1)
83  , foreign_table_(foreign_table)
84  , last_fragment_index_(0)
85  , last_fragment_row_count_(0)
86  , total_row_count_(0)
87  , last_file_row_count_(0)
88  , last_row_group_(0)
89  , is_restored_(false)
90  , file_system_(file_system)
91  , file_reader_cache_(std::make_unique<FileReaderMap>()) {}
92 
94  const ForeignTable* foreign_table,
95  const bool do_metadata_stats_validation)
96  : do_metadata_stats_validation_(do_metadata_stats_validation)
97  , db_id_(db_id)
98  , foreign_table_(foreign_table)
99  , last_fragment_index_(0)
100  , last_fragment_row_count_(0)
101  , total_row_count_(0)
102  , last_file_row_count_(0)
103  , last_row_group_(0)
104  , is_restored_(false)
105  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
106  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
107  auto& server_options = foreign_table->foreign_server->options;
108  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
109  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
110  } else {
111  UNREACHABLE();
112  }
113 }
114 
118 
119  last_row_group_ = 0;
122  total_row_count_ = 0;
124  file_reader_cache_->clear();
125 }
126 
127 std::list<const ColumnDescriptor*> ParquetDataWrapper::getColumnsToInitialize(
128  const Interval<ColumnType>& column_interval) {
130  CHECK(catalog);
131  const auto& columns = schema_->getLogicalAndPhysicalColumns();
132  auto column_start = column_interval.start;
133  auto column_end = column_interval.end;
134  std::list<const ColumnDescriptor*> columns_to_init;
135  for (const auto column : columns) {
136  auto column_id = column->columnId;
137  if (column_id >= column_start && column_id <= column_end) {
138  columns_to_init.push_back(column);
139  }
140  }
141  return columns_to_init;
142 }
143 
145  const int fragment_index,
146  const Interval<ColumnType>& column_interval,
147  const ChunkToBufferMap& required_buffers,
148  const bool reserve_buffers_and_set_stats) {
149  for (const auto column : getColumnsToInitialize(column_interval)) {
150  Chunk_NS::Chunk chunk{column, false};
151  ChunkKey data_chunk_key;
152  if (column->columnType.is_varlen_indeed()) {
153  data_chunk_key = {
154  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
155  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
156  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
157  chunk.setBuffer(data_buffer);
158 
159  ChunkKey index_chunk_key{
160  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
161  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
162  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
163  chunk.setIndexBuffer(index_buffer);
164  } else {
165  data_chunk_key = {
166  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
167  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
168  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
169  chunk.setBuffer(data_buffer);
170  }
171  chunk.initEncoder();
172  if (reserve_buffers_and_set_stats) {
173  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
174  CHECK(metadata_it != chunk_metadata_map_.end());
175  auto buffer = chunk.getBuffer();
176  auto& metadata = metadata_it->second;
177  auto encoder = buffer->getEncoder();
178  encoder->resetChunkStats(metadata->chunkStats);
179  encoder->setNumElems(metadata->numElements);
180  if ((column->columnType.is_string() &&
181  column->columnType.get_compression() == kENCODING_NONE) ||
182  column->columnType.is_geometry()) {
183  // non-dictionary string or geometry WKT string
184  auto index_buffer = chunk.getIndexBuf();
185  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
186  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
187  auto index_buffer = chunk.getIndexBuf();
188  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
189  } else {
190  size_t num_bytes_to_reserve =
191  metadata->numElements * column->columnType.get_size();
192  buffer->reserve(num_bytes_to_reserve);
193  }
194  }
195  }
196 }
197 
201 }
202 
203 void ParquetDataWrapper::addNewFragment(int row_group, const std::string& file_path) {
204  const auto last_fragment_entry =
206  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
207 
208  last_fragment_entry->second.back().end_index = last_row_group_;
212  RowGroupInterval{file_path, row_group});
213  setLastFileRowCount(file_path);
214 }
215 
216 bool ParquetDataWrapper::isNewFile(const std::string& file_path) const {
217  const auto last_fragment_entry =
219  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
220 
221  // The entry for the first fragment starts out as an empty vector
222  if (last_fragment_entry->second.empty()) {
223  // File roll off can result in empty older fragments.
226  }
227  return true;
228  } else {
229  return (last_fragment_entry->second.back().file_path != file_path);
230  }
231 }
232 
233 void ParquetDataWrapper::addNewFile(const std::string& file_path) {
234  const auto last_fragment_entry =
236  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
237 
238  // The entry for the first fragment starts out as an empty vector
239  if (last_fragment_entry->second.empty()) {
240  // File roll off can result in empty older fragments.
243  }
244  } else {
245  last_fragment_entry->second.back().end_index = last_row_group_;
246  }
247  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
248  setLastFileRowCount(file_path);
249 }
250 
251 void ParquetDataWrapper::setLastFileRowCount(const std::string& file_path) {
252  auto reader = file_reader_cache_->getOrInsert(file_path, file_system_);
253  last_file_row_count_ = reader->parquet_reader()->metadata()->num_rows();
254 }
255 
258  CHECK(catalog);
259  std::vector<std::string> new_file_paths;
260  auto processed_file_paths = getOrderedProcessedFilePaths();
261  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
262  auto all_file_paths = getAllFilePaths();
264  const auto rolled_off_files =
265  shared::check_for_rolled_off_file_paths(all_file_paths, processed_file_paths);
266  updateMetadataForRolledOffFiles(rolled_off_files);
267  }
268 
269  for (const auto& file_path : processed_file_paths) {
270  if (!shared::contains(all_file_paths, file_path)) {
271  throw_removed_file_error(file_path);
272  }
273  }
274 
275  // For multi-file appends, reprocess the last file in order to account for appends
276  // that may have occurred to this file. For single file appends, reprocess file if new
277  // rows have been added.
278  if (!processed_file_paths.empty()) {
279  // Single file append
280  if (all_file_paths.size() == 1) {
281  CHECK_EQ(processed_file_paths.size(), size_t(1));
282  CHECK_EQ(processed_file_paths[0], all_file_paths[0]);
283  }
284 
285  const auto& last_file_path = processed_file_paths.back();
286  // Since an existing file is being appended to we need to update the cached
287  // FileReader as the existing one will be out of date.
288  auto reader = file_reader_cache_->insert(last_file_path, file_system_);
289  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
290  if (row_count < last_file_row_count_) {
291  throw_removed_row_in_file_error(last_file_path);
292  } else if (row_count > last_file_row_count_) {
293  removeMetadataForLastFile(last_file_path);
294  new_file_paths.emplace_back(last_file_path);
295  }
296  }
297 
298  for (const auto& file_path : all_file_paths) {
299  if (!shared::contains(processed_file_paths, file_path)) {
300  new_file_paths.emplace_back(file_path);
301  }
302  }
303  } else {
304  CHECK(chunk_metadata_map_.empty());
305  new_file_paths = getAllFilePaths();
307  }
308 
309  if (!new_file_paths.empty()) {
310  metadataScanFiles(new_file_paths);
311  }
312 }
313 
315  const std::set<std::string>& rolled_off_files) {
316  if (!rolled_off_files.empty()) {
317  std::set<int32_t> deleted_fragment_ids;
318  std::optional<int32_t> partially_deleted_fragment_id;
319  std::vector<std::string> remaining_files_in_partially_deleted_fragment;
320  for (auto& [fragment_id, row_group_interval_vec] :
322  for (auto it = row_group_interval_vec.begin();
323  it != row_group_interval_vec.end();) {
324  if (shared::contains(rolled_off_files, it->file_path)) {
325  it = row_group_interval_vec.erase(it);
326  } else {
327  remaining_files_in_partially_deleted_fragment.emplace_back(it->file_path);
328  it++;
329  }
330  }
331  if (row_group_interval_vec.empty()) {
332  deleted_fragment_ids.emplace(fragment_id);
333  } else {
334  CHECK(!remaining_files_in_partially_deleted_fragment.empty());
335  partially_deleted_fragment_id = fragment_id;
336  break;
337  }
338  }
339 
340  for (auto it = chunk_metadata_map_.begin(); it != chunk_metadata_map_.end();) {
341  const auto& chunk_key = it->first;
342  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
343  auto& chunk_metadata = it->second;
344  chunk_metadata->numElements = 0;
345  chunk_metadata->numBytes = 0;
346  it++;
347  } else if (partially_deleted_fragment_id.has_value() &&
348  chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
349  // Metadata for the partially deleted fragment will be re-populated.
350  it = chunk_metadata_map_.erase(it);
351  } else {
352  it++;
353  }
354  }
355 
356  if (partially_deleted_fragment_id.has_value()) {
357  // Create map of row group to row group metadata for remaining files in the
358  // fragment.
359  auto row_group_metadata_map =
360  getRowGroupMetadataMap(remaining_files_in_partially_deleted_fragment);
361 
362  // Re-populate metadata for remaining row groups in partially deleted fragment.
363  auto column_interval =
364  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
365  schema_->getLogicalAndPhysicalColumns().back()->columnId};
366  auto row_group_intervals = shared::get_from_map(
367  fragment_to_row_group_interval_map_, partially_deleted_fragment_id.value());
368  for (const auto& row_group_interval : row_group_intervals) {
369  for (auto row_group = row_group_interval.start_index;
370  row_group <= row_group_interval.end_index;
371  row_group++) {
372  const auto& row_group_metadata_item = shared::get_from_map(
373  row_group_metadata_map, {row_group_interval.file_path, row_group});
374  updateChunkMetadataForFragment(column_interval,
375  row_group_metadata_item.column_chunk_metadata,
376  partially_deleted_fragment_id.value());
377  }
378  }
379  }
380  }
381 }
382 
384  std::vector<std::string> file_paths;
385  for (const auto& entry : fragment_to_row_group_interval_map_) {
386  for (const auto& row_group_interval : entry.second) {
387  if (file_paths.empty() || file_paths.back() != row_group_interval.file_path) {
388  file_paths.emplace_back(row_group_interval.file_path);
389  }
390  }
391  }
392  return file_paths;
393 }
394 
395 std::vector<std::string> ParquetDataWrapper::getAllFilePaths() {
396  auto timer = DEBUG_TIMER(__func__);
397  std::vector<std::string> found_file_paths;
398  auto file_path = getFullFilePath(foreign_table_);
399  const auto file_path_options = getFilePathOptions(foreign_table_);
400  auto& server_options = foreign_table_->foreign_server->options;
401  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
402  found_file_paths = shared::local_glob_filter_sort_files(file_path, file_path_options);
403  } else {
404  UNREACHABLE();
405  }
406  return found_file_paths;
407 }
408 
409 void ParquetDataWrapper::metadataScanFiles(const std::vector<std::string>& file_paths) {
410  auto row_group_metadata = getRowGroupMetadataForFilePaths(file_paths);
411  metadataScanRowGroupMetadata(row_group_metadata);
412 }
413 
415  const std::list<RowGroupMetadata>& row_group_metadata) {
416  auto column_interval =
417  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
418  schema_->getLogicalAndPhysicalColumns().back()->columnId};
419  for (const auto& row_group_metadata_item : row_group_metadata) {
420  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
421  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
422  const auto import_row_count = (*column_chunk_metadata_iter)->numElements;
423  auto row_group = row_group_metadata_item.row_group_index;
424  const auto& file_path = row_group_metadata_item.file_path;
425  if (moveToNextFragment(import_row_count)) {
426  addNewFragment(row_group, file_path);
427  } else if (isNewFile(file_path)) {
428  CHECK_EQ(row_group, 0);
429  addNewFile(file_path);
430  }
431  last_row_group_ = row_group;
433  column_interval, column_chunk_metadata, last_fragment_index_);
434 
435  last_fragment_row_count_ += import_row_count;
436  total_row_count_ += import_row_count;
437  }
439 }
440 
442  const std::vector<std::string>& file_paths) const {
443  LazyParquetChunkLoader chunk_loader(
445  return chunk_loader.metadataScan(file_paths, *schema_, do_metadata_stats_validation_);
446 }
447 
449  const Interval<ColumnType>& column_interval,
450  const std::list<std::shared_ptr<ChunkMetadata>>& column_chunk_metadata,
451  int32_t fragment_id) {
452  CHECK_EQ(static_cast<int>(column_chunk_metadata.size()),
453  schema_->numLogicalAndPhysicalColumns());
454  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
455  for (auto column_id = column_interval.start; column_id <= column_interval.end;
456  column_id++, column_chunk_metadata_iter++) {
457  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
458  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
459  const auto& type_info = column_descriptor->columnType;
460  ChunkKey const data_chunk_key =
461  type_info.is_varlen_indeed()
462  ? ChunkKey{db_id_, foreign_table_->tableId, column_id, fragment_id, 1}
463  : ChunkKey{db_id_, foreign_table_->tableId, column_id, fragment_id};
464  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
465 
466  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
467  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
468  } else {
469  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
470  }
471  }
472 }
473 
474 bool ParquetDataWrapper::moveToNextFragment(size_t new_rows_count) const {
475  return (last_fragment_row_count_ + new_rows_count) >
476  static_cast<size_t>(foreign_table_->maxFragRows);
477 }
478 
480  ChunkMetadataVector& chunk_metadata_vector) {
482  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
483  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
484  }
485 }
486 
488  const int logical_column_id,
489  const int fragment_id,
490  const ChunkToBufferMap& required_buffers,
491  AbstractBuffer* delete_buffer) {
492  const auto& row_group_intervals =
494  // File roll off can lead to an empty row group interval vector.
495  if (row_group_intervals.empty()) {
496  return;
497  }
498 
500  CHECK(catalog);
501  const ColumnDescriptor* logical_column =
502  schema_->getColumnDescriptor(logical_column_id);
503  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
504 
505  const Interval<ColumnType> column_interval = {
506  logical_column_id,
507  logical_column_id + logical_column->columnType.get_physical_cols()};
508  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
509 
510  const bool is_dictionary_encoded_string_column =
511  logical_column->columnType.is_dict_encoded_string() ||
512  (logical_column->columnType.is_array() &&
513  logical_column->columnType.get_elem_type().is_dict_encoded_string());
514 
515  StringDictionary* string_dictionary = nullptr;
516  if (is_dictionary_encoded_string_column) {
517  auto dict_descriptor =
518  catalog->getMetadataForDict(logical_column->columnType.get_comp_param(), true);
519  CHECK(dict_descriptor);
520  string_dictionary = dict_descriptor->stringDict.get();
521  }
522 
523  std::list<Chunk_NS::Chunk> chunks;
524  for (int column_id = column_interval.start; column_id <= column_interval.end;
525  ++column_id) {
526  auto column_descriptor = schema_->getColumnDescriptor(column_id);
527  Chunk_NS::Chunk chunk{column_descriptor, false};
528  if (column_descriptor->columnType.is_varlen_indeed()) {
529  ChunkKey data_chunk_key = {
530  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
531  auto buffer = shared::get_from_map(required_buffers, data_chunk_key);
532  chunk.setBuffer(buffer);
533  ChunkKey index_chunk_key = {
534  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
535  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
536  chunk.setIndexBuffer(index_buffer);
537  } else {
538  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
539  auto buffer = shared::get_from_map(required_buffers, chunk_key);
540  chunk.setBuffer(buffer);
541  }
542  chunks.emplace_back(chunk);
543  }
544 
545  std::unique_ptr<RejectedRowIndices> rejected_row_indices;
546  if (delete_buffer) {
547  rejected_row_indices = std::make_unique<RejectedRowIndices>();
548  }
550  file_reader_cache_.get(),
553  auto metadata = chunk_loader.loadChunk(row_group_intervals,
554  parquet_column_index,
555  chunks,
556  string_dictionary,
557  rejected_row_indices.get());
558 
559  if (delete_buffer) {
560  // all modifying operations on `delete_buffer` must be synchronized as it is a
561  // shared buffer
562  std::unique_lock<std::mutex> delete_buffer_lock(delete_buffer_mutex_);
563 
564  CHECK(!chunks.empty());
565  CHECK(chunks.begin()->getBuffer()->hasEncoder());
566  auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
567 
568  // ensure delete buffer is sized appropriately
569  if (delete_buffer->size() < num_rows_in_chunk) {
570  auto remaining_rows = num_rows_in_chunk - delete_buffer->size();
571  std::vector<int8_t> data(remaining_rows, false);
572  delete_buffer->append(data.data(), remaining_rows);
573  }
574 
575  // compute a logical OR with current `delete_buffer` contents and this chunks
576  // rejected indices
577  CHECK(rejected_row_indices);
578  auto delete_buffer_data = delete_buffer->getMemoryPtr();
579  for (const auto& rejected_index : *rejected_row_indices) {
580  CHECK_GT(delete_buffer->size(), static_cast<size_t>(rejected_index));
581  delete_buffer_data[rejected_index] = true;
582  }
583  }
584 
585  auto metadata_iter = metadata.begin();
586  for (int column_id = column_interval.start; column_id <= column_interval.end;
587  ++column_id, ++metadata_iter) {
588  auto column = schema_->getColumnDescriptor(column_id);
589  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
590  if (column->columnType.is_varlen_indeed()) {
591  data_chunk_key.emplace_back(1);
592  }
593  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
594 
595  // Allocate new shared_ptr for metadata so we dont modify old one which may be used
596  // by executor
597  auto cached_metadata_previous =
598  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
599  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
600  std::make_shared<ChunkMetadata>();
601  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
602  *cached_metadata = *cached_metadata_previous;
603 
604  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
605  cached_metadata->numBytes =
606  shared::get_from_map(required_buffers, data_chunk_key)->size();
607 
608  // for certain types, update the metadata statistics
609  // should update the cache, and the internal chunk_metadata_map_
610  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
611  CHECK(metadata_iter != metadata.end());
612  cached_metadata->chunkStats = (*metadata_iter)->chunkStats;
613 
614  // Update stats on buffer so it is saved in cache
615  shared::get_from_map(required_buffers, data_chunk_key)
616  ->getEncoder()
617  ->resetChunkStats(cached_metadata->chunkStats);
618  }
619  }
620 }
621 
623  const ChunkToBufferMap& optional_buffers,
624  AbstractBuffer* delete_buffer) {
625  ChunkToBufferMap buffers_to_load;
626  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
627  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
628 
629  CHECK(!buffers_to_load.empty());
630 
631  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
632  for (const auto& [chunk_key, buffer] : buffers_to_load) {
633  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
634  col_frag_hints.emplace(
635  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
636  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
637  }
638 
639  std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
640  [&, this](const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
641  for (const auto& [col_id, frag_id] : hint_set) {
643  col_id, frag_id, buffers_to_load, delete_buffer);
644  }
645  };
646 
649  auto futures = create_futures_for_workers(col_frag_hints, num_threads, lambda);
650 
651  // We wait on all futures, then call get because we want all threads to have finished
652  // before we propagate a potential exception.
653  for (auto& future : futures) {
654  future.wait();
655  }
656 
657  for (auto& future : futures) {
658  future.get();
659  }
660 }
661 
662 void set_value(rapidjson::Value& json_val,
663  const RowGroupInterval& value,
664  rapidjson::Document::AllocatorType& allocator) {
665  json_val.SetObject();
666  json_utils::add_value_to_object(json_val, value.file_path, "file_path", allocator);
667  json_utils::add_value_to_object(json_val, value.start_index, "start_index", allocator);
668  json_utils::add_value_to_object(json_val, value.end_index, "end_index", allocator);
669 }
670 
671 void get_value(const rapidjson::Value& json_val, RowGroupInterval& value) {
672  CHECK(json_val.IsObject());
673  json_utils::get_value_from_object(json_val, value.file_path, "file_path");
674  json_utils::get_value_from_object(json_val, value.start_index, "start_index");
675  json_utils::get_value_from_object(json_val, value.end_index, "end_index");
676 }
677 
679  rapidjson::Document d;
680  d.SetObject();
681 
684  "fragment_to_row_group_interval_map",
685  d.GetAllocator());
686  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
688  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
690  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
692  d, total_row_count_, "total_row_count", d.GetAllocator());
694  d, last_file_row_count_, "last_file_row_count", d.GetAllocator());
695  return json_utils::write_to_string(d);
696 }
697 
699  const std::string& file_path,
700  const ChunkMetadataVector& chunk_metadata_vector) {
701  auto d = json_utils::read_from_file(file_path);
702  CHECK(d.IsObject());
703 
705  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
707  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
709  d, last_fragment_row_count_, "last_fragment_row_count");
711  if (d.HasMember("last_file_row_count")) {
712  json_utils::get_value_from_object(d, last_file_row_count_, "last_file_row_count");
713  }
714 
715  CHECK(chunk_metadata_map_.empty());
716  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
717  chunk_metadata_map_[chunk_key] = chunk_metadata;
718  }
719  is_restored_ = true;
720 }
721 
723  return is_restored_;
724 }
725 
728  file_reader_cache_.get(),
731  auto file_paths = getAllFilePaths();
732  if (file_paths.empty()) {
733  throw ForeignStorageException{"No file found at \"" +
735  }
736  return chunk_loader.previewFiles(file_paths, num_rows, *foreign_table_);
737 }
738 
739 // declared in three derived classes to avoid
740 // polluting ForeignDataWrapper virtual base
741 // @TODO refactor to lower class if needed
743  // must have these
744  CHECK_GE(db_id_, 0);
746 
747  // populate map for all poly columns in this table
749  CHECK(catalog);
750  auto columns =
751  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
752  for (auto const& column : columns) {
753  if (IS_GEO_POLY(column->columnType.get_type())) {
755  .try_emplace(column->columnId,
756  std::make_unique<import_export::RenderGroupAnalyzer>())
757  .second);
758  }
759  }
760 }
761 
762 void ParquetDataWrapper::removeMetadataForLastFile(const std::string& last_file_path) {
763  std::optional<int32_t> first_deleted_fragment_id;
764  for (auto it = fragment_to_row_group_interval_map_.begin();
766  const auto fragment_id = it->first;
767  const auto& row_group_intervals = it->second;
768  for (const auto& row_group_interval : row_group_intervals) {
769  if (first_deleted_fragment_id.has_value()) {
770  // All subsequent fragments should map to the last file.
771  CHECK_EQ(last_file_path, row_group_interval.file_path);
772  } else if (last_file_path == row_group_interval.file_path) {
773  first_deleted_fragment_id = fragment_id;
774  }
775  }
776  if (first_deleted_fragment_id.has_value() &&
777  first_deleted_fragment_id.value() < fragment_id) {
779  } else {
780  it++;
781  }
782  }
783  CHECK(first_deleted_fragment_id.has_value());
784 
785  std::map<int32_t, size_t> remaining_fragments_row_counts;
786  for (auto it = chunk_metadata_map_.begin(); it != chunk_metadata_map_.end();) {
787  auto fragment_id = it->first[CHUNK_KEY_FRAGMENT_IDX];
788  if (fragment_id >= first_deleted_fragment_id.value()) {
789  it = chunk_metadata_map_.erase(it);
790  } else {
791  auto fragment_count_it = remaining_fragments_row_counts.find(fragment_id);
792  if (fragment_count_it == remaining_fragments_row_counts.end()) {
793  remaining_fragments_row_counts[fragment_id] = it->second->numElements;
794  } else {
795  CHECK_EQ(remaining_fragments_row_counts[fragment_id], it->second->numElements);
796  }
797  it++;
798  }
799  }
800 
801  total_row_count_ = 0;
802  for (const auto [fragment_id, row_count] : remaining_fragments_row_counts) {
803  total_row_count_ += row_count;
804  }
805 
806  // Re-populate metadata for last fragment with deleted rows, excluding metadata for the
807  // last file.
808  auto row_group_intervals_to_scan = shared::get_from_map(
809  fragment_to_row_group_interval_map_, first_deleted_fragment_id.value());
810  auto it = std::find_if(row_group_intervals_to_scan.begin(),
811  row_group_intervals_to_scan.end(),
812  [&last_file_path](const auto& row_group_interval) {
813  return row_group_interval.file_path == last_file_path;
814  });
815  CHECK(it != row_group_intervals_to_scan.end());
816  row_group_intervals_to_scan.erase(it, row_group_intervals_to_scan.end());
817 
818  if (first_deleted_fragment_id.value() > 0) {
819  last_fragment_index_ = first_deleted_fragment_id.value() - 1;
821  shared::get_from_map(remaining_fragments_row_counts, last_fragment_index_);
822  const auto& last_row_group_intervals =
824  if (last_row_group_intervals.empty()) {
825  last_row_group_ = 0;
826  } else {
827  last_row_group_ = last_row_group_intervals.back().end_index;
828  }
829  fragment_to_row_group_interval_map_.erase(first_deleted_fragment_id.value());
830  } else {
831  CHECK_EQ(total_row_count_, size_t(0));
833  }
834 
835  if (!row_group_intervals_to_scan.empty()) {
836  metadataScanRowGroupIntervals(row_group_intervals_to_scan);
837  }
838 }
839 
841  const std::vector<RowGroupInterval>& row_group_intervals) {
842  std::vector<std::string> file_paths;
843  for (const auto& row_group_interval : row_group_intervals) {
844  file_paths.emplace_back(row_group_interval.file_path);
845  }
846  auto row_group_metadata_map = getRowGroupMetadataMap(file_paths);
847  std::list<RowGroupMetadata> row_group_metadata;
848  for (const auto& row_group_interval : row_group_intervals) {
849  for (auto row_group = row_group_interval.start_index;
850  row_group <= row_group_interval.end_index;
851  row_group++) {
852  row_group_metadata.emplace_back(shared::get_from_map(
853  row_group_metadata_map, {row_group_interval.file_path, row_group}));
854  }
855  }
856  metadataScanRowGroupMetadata(row_group_metadata);
857 }
858 
859 std::map<FilePathAndRowGroup, RowGroupMetadata>
861  const std::vector<std::string>& file_paths) const {
862  auto row_group_metadata = getRowGroupMetadataForFilePaths(file_paths);
863  std::map<FilePathAndRowGroup, RowGroupMetadata> row_group_metadata_map;
864  for (const auto& row_group_metadata_item : row_group_metadata) {
865  row_group_metadata_map[{row_group_metadata_item.file_path,
866  row_group_metadata_item.row_group_index}] =
867  row_group_metadata_item;
868  }
869  return row_group_metadata_map;
870 }
871 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:195
std::string getSerializedDataWrapper() const override
#define CHECK_EQ(x, y)
Definition: Logger.h:297
std::vector< int > ChunkKey
Definition: types.h:36
std::unique_ptr< FileReaderMap > file_reader_cache_
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void updateMetadataForRolledOffFiles(const std::set< std::string > &rolled_off_files)
std::vector< std::string > getOrderedProcessedFilePaths()
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
std::string tableName
void setLastFileRowCount(const std::string &file_path)
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
size_t get_num_threads(const ForeignTable &table)
virtual int8_t * getMemoryPtr()=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
#define UNREACHABLE()
Definition: Logger.h:333
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define CHECK_GE(x, y)
Definition: Logger.h:302
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
Definition: CsvShared.cpp:44
#define CHECK_GT(x, y)
Definition: Logger.h:301
void metadataScanRowGroupIntervals(const std::vector< RowGroupInterval > &row_group_intervals)
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::set< std::string > check_for_rolled_off_file_paths(const std::vector< std::string > &all_file_paths, std::vector< std::string > &processed_file_paths)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:274
int32_t StringOffsetT
Definition: sqltypes.h:1232
void throw_removed_file_error(const std::string &file_path)
This file contains the class specification and related data structures for Catalog.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
DataPreview getDataPreview(const size_t num_rows)
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:172
int get_physical_cols() const
Definition: sqltypes.h:411
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
static SysCatalog & instance()
Definition: SysCatalog.h:341
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
RenderGroupAnalyzerMap render_group_analyzer_map_
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
rapidjson::Document read_from_file(const std::string &file_path)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:157
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
void removeMetadataForLastFile(const std::string &last_file_path)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
static bool allowFileRollOff(const ForeignTable *foreign_table)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
void metadataScanFiles(const std::vector< std::string > &file_paths)
int32_t ArrayOffsetT
Definition: sqltypes.h:1233
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:389
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:289
bool is_geometry() const
Definition: sqltypes.h:588
#define DEBUG_TIMER(name)
Definition: Logger.h:407
std::string write_to_string(const rapidjson::Document &document)
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
bool is_dict_encoded_string() const
Definition: sqltypes.h:628
SQLTypeInfo columnType
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:957
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
Definition: CsvShared.cpp:26
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
bool is_array() const
Definition: sqltypes.h:584
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const
#define IS_GEO_POLY(T)
Definition: sqltypes.h:303