21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
36 namespace foreign_storage {
40 std::shared_ptr<ChunkMetadata> reduce_from) {
41 CHECK(reduce_to->sqlType == reduce_from->sqlType);
42 reduce_to->numBytes += reduce_from->numBytes;
43 reduce_to->numElements += reduce_from->numElements;
44 reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
46 auto column_type = reduce_to->sqlType;
47 column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
51 if (column_type.is_string() || column_type.is_geometry()) {
54 reduce_to->chunkStats.max = reduce_from->chunkStats.max;
55 reduce_to->chunkStats.min = reduce_from->chunkStats.min;
69 encoder_to->reduceStats(*encoder_from);
70 auto updated_metadata = std::make_shared<ChunkMetadata>();
71 encoder_to->getMetadata(updated_metadata);
72 reduce_to->chunkStats = updated_metadata->chunkStats;
77 : do_metadata_stats_validation_(
true), db_id_(-1), foreign_table_(nullptr) {}
80 std::shared_ptr<arrow::fs::FileSystem> file_system)
81 : do_metadata_stats_validation_(
false)
83 , foreign_table_(foreign_table)
84 , last_fragment_index_(0)
85 , last_fragment_row_count_(0)
87 , last_file_row_count_(0)
90 , file_system_(file_system)
95 const bool do_metadata_stats_validation)
96 : do_metadata_stats_validation_(do_metadata_stats_validation)
98 , foreign_table_(foreign_table)
99 , last_fragment_index_(0)
100 , last_fragment_row_count_(0)
101 , total_row_count_(0)
102 , last_file_row_count_(0)
104 , is_restored_(
false)
109 file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
131 const auto& columns =
schema_->getLogicalAndPhysicalColumns();
132 auto column_start = column_interval.
start;
133 auto column_end = column_interval.
end;
134 std::list<const ColumnDescriptor*> columns_to_init;
135 for (
const auto column : columns) {
136 auto column_id = column->columnId;
137 if (column_id >= column_start && column_id <= column_end) {
138 columns_to_init.push_back(column);
141 return columns_to_init;
145 const int fragment_index,
148 const bool reserve_buffers_and_set_stats) {
152 if (column->columnType.is_varlen_indeed()) {
155 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
157 chunk.setBuffer(data_buffer);
161 CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
163 chunk.setIndexBuffer(index_buffer);
167 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
169 chunk.setBuffer(data_buffer);
172 if (reserve_buffers_and_set_stats) {
175 auto buffer = chunk.getBuffer();
176 auto& metadata = metadata_it->second;
177 auto encoder = buffer->getEncoder();
178 encoder->resetChunkStats(metadata->chunkStats);
179 encoder->setNumElems(metadata->numElements);
180 if ((column->columnType.is_string() &&
182 column->columnType.is_geometry()) {
184 auto index_buffer = chunk.getIndexBuf();
185 index_buffer->reserve(
sizeof(
StringOffsetT) * (metadata->numElements + 1));
186 }
else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
187 auto index_buffer = chunk.getIndexBuf();
188 index_buffer->reserve(
sizeof(
ArrayOffsetT) * (metadata->numElements + 1));
190 size_t num_bytes_to_reserve =
191 metadata->numElements * column->columnType.get_size();
192 buffer->reserve(num_bytes_to_reserve);
204 const auto last_fragment_entry =
217 const auto last_fragment_entry =
222 if (last_fragment_entry->second.empty()) {
229 return (last_fragment_entry->second.back().file_path != file_path);
234 const auto last_fragment_entry =
239 if (last_fragment_entry->second.empty()) {
259 std::vector<std::string> new_file_paths;
264 const auto rolled_off_files =
269 for (
const auto& file_path : processed_file_paths) {
278 if (!processed_file_paths.empty()) {
280 if (all_file_paths.size() == 1) {
281 CHECK_EQ(processed_file_paths.size(), size_t(1));
282 CHECK_EQ(processed_file_paths[0], all_file_paths[0]);
285 const auto& last_file_path = processed_file_paths.back();
289 size_t row_count = reader->parquet_reader()->metadata()->num_rows();
294 new_file_paths.emplace_back(last_file_path);
298 for (
const auto& file_path : all_file_paths) {
300 new_file_paths.emplace_back(file_path);
309 if (!new_file_paths.empty()) {
315 const std::set<std::string>& rolled_off_files) {
316 if (!rolled_off_files.empty()) {
317 std::set<int32_t> deleted_fragment_ids;
318 std::optional<int32_t> partially_deleted_fragment_id;
319 std::vector<std::string> remaining_files_in_partially_deleted_fragment;
320 for (
auto& [fragment_id, row_group_interval_vec] :
322 for (
auto it = row_group_interval_vec.begin();
323 it != row_group_interval_vec.end();) {
325 it = row_group_interval_vec.erase(it);
327 remaining_files_in_partially_deleted_fragment.emplace_back(it->file_path);
331 if (row_group_interval_vec.empty()) {
332 deleted_fragment_ids.emplace(fragment_id);
334 CHECK(!remaining_files_in_partially_deleted_fragment.empty());
335 partially_deleted_fragment_id = fragment_id;
341 const auto& chunk_key = it->first;
343 auto& chunk_metadata = it->second;
344 chunk_metadata->numElements = 0;
345 chunk_metadata->numBytes = 0;
347 }
else if (partially_deleted_fragment_id.has_value() &&
356 if (partially_deleted_fragment_id.has_value()) {
359 auto row_group_metadata_map =
363 auto column_interval =
365 schema_->getLogicalAndPhysicalColumns().back()->columnId};
367 fragment_to_row_group_interval_map_, partially_deleted_fragment_id.value());
368 for (
const auto& row_group_interval : row_group_intervals) {
369 for (
auto row_group = row_group_interval.start_index;
370 row_group <= row_group_interval.end_index;
373 row_group_metadata_map, {row_group_interval.file_path, row_group});
375 row_group_metadata_item.column_chunk_metadata,
376 partially_deleted_fragment_id.value());
384 std::vector<std::string> file_paths;
386 for (
const auto& row_group_interval : entry.second) {
387 if (file_paths.empty() || file_paths.back() != row_group_interval.file_path) {
388 file_paths.emplace_back(row_group_interval.file_path);
397 std::vector<std::string> found_file_paths;
406 return found_file_paths;
415 const std::list<RowGroupMetadata>& row_group_metadata) {
416 auto column_interval =
418 schema_->getLogicalAndPhysicalColumns().back()->columnId};
419 for (
const auto& row_group_metadata_item : row_group_metadata) {
420 const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
421 auto column_chunk_metadata_iter = column_chunk_metadata.begin();
422 const auto import_row_count = (*column_chunk_metadata_iter)->numElements;
423 auto row_group = row_group_metadata_item.row_group_index;
424 const auto& file_path = row_group_metadata_item.file_path;
442 const std::vector<std::string>& file_paths)
const {
450 const std::list<std::shared_ptr<ChunkMetadata>>& column_chunk_metadata,
451 int32_t fragment_id) {
452 CHECK_EQ(static_cast<int>(column_chunk_metadata.size()),
453 schema_->numLogicalAndPhysicalColumns());
454 auto column_chunk_metadata_iter = column_chunk_metadata.begin();
455 for (
auto column_id = column_interval.
start; column_id <= column_interval.
end;
456 column_id++, column_chunk_metadata_iter++) {
457 CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
458 const auto column_descriptor =
schema_->getColumnDescriptor(column_id);
459 const auto& type_info = column_descriptor->columnType;
461 type_info.is_varlen_indeed()
464 std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
483 chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
488 const int logical_column_id,
489 const int fragment_id,
492 const auto& row_group_intervals =
495 if (row_group_intervals.empty()) {
502 schema_->getColumnDescriptor(logical_column_id);
503 auto parquet_column_index =
schema_->getParquetColumnIndex(logical_column_id);
510 const bool is_dictionary_encoded_string_column =
516 if (is_dictionary_encoded_string_column) {
517 auto dict_descriptor =
519 CHECK(dict_descriptor);
520 string_dictionary = dict_descriptor->stringDict.get();
523 std::list<Chunk_NS::Chunk> chunks;
524 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
526 auto column_descriptor =
schema_->getColumnDescriptor(column_id);
528 if (column_descriptor->columnType.is_varlen_indeed()) {
532 chunk.setBuffer(buffer);
536 chunk.setIndexBuffer(index_buffer);
540 chunk.setBuffer(buffer);
542 chunks.emplace_back(chunk);
545 std::unique_ptr<RejectedRowIndices> rejected_row_indices;
547 rejected_row_indices = std::make_unique<RejectedRowIndices>();
553 auto metadata = chunk_loader.
loadChunk(row_group_intervals,
554 parquet_column_index,
557 rejected_row_indices.get());
564 CHECK(!chunks.empty());
565 CHECK(chunks.begin()->getBuffer()->hasEncoder());
566 auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
569 if (delete_buffer->
size() < num_rows_in_chunk) {
570 auto remaining_rows = num_rows_in_chunk - delete_buffer->
size();
571 std::vector<int8_t> data(remaining_rows,
false);
572 delete_buffer->
append(data.data(), remaining_rows);
577 CHECK(rejected_row_indices);
578 auto delete_buffer_data = delete_buffer->
getMemoryPtr();
579 for (
const auto& rejected_index : *rejected_row_indices) {
580 CHECK_GT(delete_buffer->
size(),
static_cast<size_t>(rejected_index));
581 delete_buffer_data[rejected_index] =
true;
585 auto metadata_iter = metadata.begin();
586 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
587 ++column_id, ++metadata_iter) {
588 auto column =
schema_->getColumnDescriptor(column_id);
590 if (column->columnType.is_varlen_indeed()) {
591 data_chunk_key.emplace_back(1);
597 auto cached_metadata_previous =
600 std::make_shared<ChunkMetadata>();
602 *cached_metadata = *cached_metadata_previous;
604 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
605 cached_metadata->numBytes =
611 CHECK(metadata_iter != metadata.end());
612 cached_metadata->chunkStats = (*metadata_iter)->chunkStats;
617 ->resetChunkStats(cached_metadata->chunkStats);
626 buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
627 buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
629 CHECK(!buffers_to_load.empty());
631 std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
632 for (
const auto& [chunk_key, buffer] : buffers_to_load) {
633 CHECK_EQ(buffer->size(),
static_cast<size_t>(0));
634 col_frag_hints.emplace(
639 std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
640 [&,
this](
const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
641 for (
const auto& [col_id, frag_id] : hint_set) {
643 col_id, frag_id, buffers_to_load, delete_buffer);
653 for (
auto& future : futures) {
657 for (
auto& future : futures) {
664 rapidjson::Document::AllocatorType& allocator) {
665 json_val.SetObject();
672 CHECK(json_val.IsObject());
679 rapidjson::Document d;
684 "fragment_to_row_group_interval_map",
699 const std::string& file_path,
711 if (d.HasMember(
"last_file_row_count")) {
716 for (
const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
732 if (file_paths.empty()) {
736 return chunk_loader.previewFiles(file_paths, num_rows, *
foreign_table_);
752 for (
auto const& column : columns) {
755 .try_emplace(column->columnId,
756 std::make_unique<import_export::RenderGroupAnalyzer>())
763 std::optional<int32_t> first_deleted_fragment_id;
766 const auto fragment_id = it->first;
767 const auto& row_group_intervals = it->second;
768 for (
const auto& row_group_interval : row_group_intervals) {
769 if (first_deleted_fragment_id.has_value()) {
771 CHECK_EQ(last_file_path, row_group_interval.file_path);
772 }
else if (last_file_path == row_group_interval.file_path) {
773 first_deleted_fragment_id = fragment_id;
776 if (first_deleted_fragment_id.has_value() &&
777 first_deleted_fragment_id.value() < fragment_id) {
783 CHECK(first_deleted_fragment_id.has_value());
785 std::map<int32_t, size_t> remaining_fragments_row_counts;
788 if (fragment_id >= first_deleted_fragment_id.value()) {
791 auto fragment_count_it = remaining_fragments_row_counts.find(fragment_id);
792 if (fragment_count_it == remaining_fragments_row_counts.end()) {
793 remaining_fragments_row_counts[fragment_id] = it->second->numElements;
795 CHECK_EQ(remaining_fragments_row_counts[fragment_id], it->second->numElements);
802 for (
const auto [fragment_id, row_count] : remaining_fragments_row_counts) {
810 auto it = std::find_if(row_group_intervals_to_scan.begin(),
811 row_group_intervals_to_scan.end(),
812 [&last_file_path](
const auto& row_group_interval) {
813 return row_group_interval.file_path == last_file_path;
815 CHECK(it != row_group_intervals_to_scan.end());
816 row_group_intervals_to_scan.erase(it, row_group_intervals_to_scan.end());
818 if (first_deleted_fragment_id.value() > 0) {
822 const auto& last_row_group_intervals =
824 if (last_row_group_intervals.empty()) {
835 if (!row_group_intervals_to_scan.empty()) {
841 const std::vector<RowGroupInterval>& row_group_intervals) {
842 std::vector<std::string> file_paths;
843 for (
const auto& row_group_interval : row_group_intervals) {
844 file_paths.emplace_back(row_group_interval.file_path);
847 std::list<RowGroupMetadata> row_group_metadata;
848 for (
const auto& row_group_interval : row_group_intervals) {
849 for (
auto row_group = row_group_interval.start_index;
850 row_group <= row_group_interval.end_index;
853 row_group_metadata_map, {row_group_interval.file_path, row_group}));
859 std::map<FilePathAndRowGroup, RowGroupMetadata>
861 const std::vector<std::string>& file_paths)
const {
863 std::map<FilePathAndRowGroup, RowGroupMetadata> row_group_metadata_map;
864 for (
const auto& row_group_metadata_item : row_group_metadata) {
865 row_group_metadata_map[{row_group_metadata_item.file_path,
866 row_group_metadata_item.row_group_index}] =
867 row_group_metadata_item;
869 return row_group_metadata_map;
bool contains(const T &container, const U &element)
std::string getSerializedDataWrapper() const override
size_t last_fragment_row_count_
void finalizeFragmentMap()
std::vector< int > ChunkKey
std::unique_ptr< FileReaderMap > file_reader_cache_
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
void updateMetadataForRolledOffFiles(const std::set< std::string > &rolled_off_files)
std::vector< std::string > getOrderedProcessedFilePaths()
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
void setLastFileRowCount(const std::string &file_path)
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
static const std::string LOCAL_FILE_STORAGE_TYPE
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
size_t get_num_threads(const ForeignTable &table)
virtual int8_t * getMemoryPtr()=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::mutex delete_buffer_mutex_
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
void metadataScanRowGroupIntervals(const std::vector< RowGroupInterval > &row_group_intervals)
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::set< std::string > check_for_rolled_off_file_paths(const std::vector< std::string > &all_file_paths, std::vector< std::string > &processed_file_paths)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
void throw_removed_file_error(const std::string &file_path)
void resetParquetMetadata()
This file contains the class specification and related data structures for Catalog.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
const bool do_metadata_stats_validation_
void addNewFile(const std::string &file_path)
void fetchChunkMetadata()
DataPreview getDataPreview(const size_t num_rows)
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
int get_physical_cols() const
const ForeignTable * foreign_table_
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
RenderGroupAnalyzerMap render_group_analyzer_map_
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
rapidjson::Document read_from_file(const std::string &file_path)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
size_t last_file_row_count_
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
void removeMetadataForLastFile(const std::string &last_file_path)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
static bool allowFileRollOff(const ForeignTable *foreign_table)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
void metadataScanFiles(const std::vector< std::string > &file_paths)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
const ForeignServer * foreign_server
Encoder * getEncoder() const
bool g_enable_watchdog false
#define DEBUG_TIMER(name)
std::string write_to_string(const rapidjson::Document &document)
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
bool is_dict_encoded_string() const
#define CHUNK_KEY_COLUMN_IDX
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
bool isRestored() const override
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const