21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
36 namespace foreign_storage {
40 std::shared_ptr<ChunkMetadata> reduce_from) {
41 CHECK(reduce_to->sqlType == reduce_from->sqlType);
42 reduce_to->numBytes += reduce_from->numBytes;
43 reduce_to->numElements += reduce_from->numElements;
44 reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
46 auto column_type = reduce_to->sqlType;
47 column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
51 if (column_type.is_string() || column_type.is_geometry()) {
54 reduce_to->chunkStats.max = reduce_from->chunkStats.max;
55 reduce_to->chunkStats.min = reduce_from->chunkStats.min;
69 encoder_to->reduceStats(*encoder_from);
70 auto updated_metadata = std::make_shared<ChunkMetadata>();
71 encoder_to->getMetadata(updated_metadata);
72 reduce_to->chunkStats = updated_metadata->chunkStats;
77 : do_metadata_stats_validation_(
true), db_id_(-1), foreign_table_(nullptr) {}
80 std::shared_ptr<arrow::fs::FileSystem> file_system)
81 : do_metadata_stats_validation_(
false)
83 , foreign_table_(foreign_table)
84 , last_fragment_index_(0)
85 , last_fragment_row_count_(0)
89 , file_system_(file_system)
94 const bool do_metadata_stats_validation)
95 : do_metadata_stats_validation_(do_metadata_stats_validation)
97 , foreign_table_(foreign_table)
98 , last_fragment_index_(0)
99 , last_fragment_row_count_(0)
100 , total_row_count_(0)
102 , is_restored_(
false)
107 file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
128 const auto& columns =
schema_->getLogicalAndPhysicalColumns();
129 auto column_start = column_interval.
start;
130 auto column_end = column_interval.
end;
131 std::list<const ColumnDescriptor*> columns_to_init;
132 for (
const auto column : columns) {
133 auto column_id = column->columnId;
134 if (column_id >= column_start && column_id <= column_end) {
135 columns_to_init.push_back(column);
138 return columns_to_init;
142 const int fragment_index,
145 const bool reserve_buffers_and_set_stats) {
149 if (column->columnType.is_varlen_indeed()) {
152 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
154 chunk.setBuffer(data_buffer);
158 CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
160 chunk.setIndexBuffer(index_buffer);
164 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
166 chunk.setBuffer(data_buffer);
169 if (reserve_buffers_and_set_stats) {
172 auto buffer = chunk.getBuffer();
173 auto& metadata = metadata_it->second;
174 auto encoder = buffer->getEncoder();
175 encoder->resetChunkStats(metadata->chunkStats);
176 encoder->setNumElems(metadata->numElements);
177 if ((column->columnType.is_string() &&
179 column->columnType.is_geometry()) {
181 auto index_buffer = chunk.getIndexBuf();
182 index_buffer->reserve(
sizeof(
StringOffsetT) * (metadata->numElements + 1));
183 }
else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
184 auto index_buffer = chunk.getIndexBuf();
185 index_buffer->reserve(
sizeof(
ArrayOffsetT) * (metadata->numElements + 1));
187 size_t num_bytes_to_reserve =
188 metadata->numElements * column->columnType.get_size();
189 buffer->reserve(num_bytes_to_reserve);
201 const auto last_fragment_entry =
213 const auto last_fragment_entry =
218 if (last_fragment_entry->second.empty()) {
222 return (last_fragment_entry->second.back().file_path != file_path);
227 const auto last_fragment_entry =
232 if (last_fragment_entry->second.empty()) {
243 std::vector<std::string> new_file_paths;
247 for (
const auto& file_path : processed_file_paths) {
253 for (
const auto& file_path : all_file_paths) {
255 new_file_paths.emplace_back(file_path);
263 if (new_file_paths.empty() && all_file_paths.size() == 1) {
264 CHECK_EQ(processed_file_paths.size(),
static_cast<size_t>(1));
265 const auto& file_path = *all_file_paths.begin();
266 CHECK_EQ(*processed_file_paths.begin(), file_path);
271 size_t row_count = reader->parquet_reader()->metadata()->num_rows();
276 new_file_paths = all_file_paths;
287 if (!new_file_paths.empty()) {
293 std::set<std::string> file_paths;
295 for (
const auto& row_group_interval : entry.second) {
296 file_paths.emplace(row_group_interval.file_path);
304 std::vector<std::string> found_file_paths;
313 file_path, regex_pattern, sort_by, sort_regex);
317 return found_file_paths;
322 auto row_group_metadata =
324 auto column_interval =
326 schema_->getLogicalAndPhysicalColumns().back()->columnId};
328 for (
const auto& row_group_metadata_item : row_group_metadata) {
329 const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
330 CHECK(static_cast<int>(column_chunk_metadata.size()) ==
331 schema_->numLogicalAndPhysicalColumns());
332 auto column_chunk_metadata_iter = column_chunk_metadata.begin();
333 const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
334 int row_group = row_group_metadata_item.row_group_index;
335 const auto& file_path = row_group_metadata_item.file_path;
344 for (
int column_id = column_interval.start; column_id <= column_interval.end;
345 column_id++, column_chunk_metadata_iter++) {
346 CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
347 const auto column_descriptor =
schema_->getColumnDescriptor(column_id);
349 const auto& type_info = column_descriptor->columnType;
352 ChunkKey data_chunk_key = chunk_key;
353 if (type_info.is_varlen_indeed()) {
354 data_chunk_key.emplace_back(1);
356 std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
378 chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
383 const int logical_column_id,
384 const int fragment_id,
390 schema_->getColumnDescriptor(logical_column_id);
391 auto parquet_column_index =
schema_->getParquetColumnIndex(logical_column_id);
398 const auto& row_group_intervals =
401 const bool is_dictionary_encoded_string_column =
407 if (is_dictionary_encoded_string_column) {
408 auto dict_descriptor =
410 CHECK(dict_descriptor);
411 string_dictionary = dict_descriptor->stringDict.get();
414 std::list<Chunk_NS::Chunk> chunks;
415 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
417 auto column_descriptor =
schema_->getColumnDescriptor(column_id);
419 if (column_descriptor->columnType.is_varlen_indeed()) {
423 chunk.setBuffer(buffer);
427 chunk.setIndexBuffer(index_buffer);
431 chunk.setBuffer(buffer);
433 chunks.emplace_back(chunk);
436 std::unique_ptr<RejectedRowIndices> rejected_row_indices;
438 rejected_row_indices = std::make_unique<RejectedRowIndices>();
442 auto metadata = chunk_loader.
loadChunk(row_group_intervals,
443 parquet_column_index,
446 rejected_row_indices.get());
453 CHECK(!chunks.empty());
454 CHECK(chunks.begin()->getBuffer()->hasEncoder());
455 auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
458 if (delete_buffer->
size() < num_rows_in_chunk) {
459 auto remaining_rows = num_rows_in_chunk - delete_buffer->
size();
460 std::vector<int8_t> data(remaining_rows,
false);
461 delete_buffer->
append(data.data(), remaining_rows);
466 CHECK(rejected_row_indices);
467 auto delete_buffer_data = delete_buffer->
getMemoryPtr();
468 for (
const auto& rejected_index : *rejected_row_indices) {
469 CHECK_GT(delete_buffer->
size(),
static_cast<size_t>(rejected_index));
470 delete_buffer_data[rejected_index] =
true;
474 auto metadata_iter = metadata.begin();
475 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
476 ++column_id, ++metadata_iter) {
477 auto column =
schema_->getColumnDescriptor(column_id);
479 if (column->columnType.is_varlen_indeed()) {
480 data_chunk_key.emplace_back(1);
486 auto cached_metadata_previous =
489 std::make_shared<ChunkMetadata>();
491 *cached_metadata = *cached_metadata_previous;
493 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
494 cached_metadata->numBytes =
500 CHECK(metadata_iter != metadata.end());
501 auto& chunk_metadata_ptr = *metadata_iter;
502 cached_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
503 cached_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
508 ->resetChunkStats(cached_metadata->chunkStats);
517 buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
518 buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
520 CHECK(!buffers_to_load.empty());
522 std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
523 for (
const auto& [chunk_key, buffer] : buffers_to_load) {
524 CHECK_EQ(buffer->size(),
static_cast<size_t>(0));
525 col_frag_hints.emplace(
530 std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
531 [&,
this](
const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
532 for (
const auto& [col_id, frag_id] : hint_set) {
534 col_id, frag_id, buffers_to_load, delete_buffer);
541 for (
auto& future : futures) {
545 for (
auto& future : futures) {
552 rapidjson::Document::AllocatorType& allocator) {
553 json_val.SetObject();
560 CHECK(json_val.IsObject());
567 rapidjson::Document d;
572 "fragment_to_row_group_interval_map",
585 const std::string& file_path,
599 for (
const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
613 if (file_paths.empty()) {
617 return chunk_loader.previewFiles(file_paths, num_rows);
633 for (
auto const& column : columns) {
636 .try_emplace(column->columnId,
637 std::make_unique<import_export::RenderGroupAnalyzer>())
bool contains(const T &container, const U &element)
std::string getSerializedDataWrapper() const override
size_t last_fragment_row_count_
void finalizeFragmentMap()
std::vector< int > ChunkKey
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
static const std::string REGEX_PATH_FILTER_KEY
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
static const std::string LOCAL_FILE_STORAGE_TYPE
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
virtual int8_t * getMemoryPtr()=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::mutex delete_buffer_mutex_
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
static const std::string FILE_SORT_ORDER_BY_KEY
void throw_removed_file_error(const std::string &file_path)
void resetParquetMetadata()
This file contains the class specification and related data structures for Catalog.
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
const bool do_metadata_stats_validation_
void addNewFile(const std::string &file_path)
void fetchChunkMetadata()
DataPreview getDataPreview(const size_t num_rows)
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
int get_physical_cols() const
const ForeignTable * foreign_table_
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
RenderGroupAnalyzerMap render_group_analyzer_map_
rapidjson::Document read_from_file(const std::string &file_path)
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::optional< std::string > getOption(const std::string_view &key) const
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
void metadataScanFiles(const std::vector< std::string > &file_paths)
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex, const bool recurse)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
const ForeignServer * foreign_server
Encoder * getEncoder() const
bool g_enable_watchdog false
#define DEBUG_TIMER(name)
std::string write_to_string(const rapidjson::Document &document)
static const std::string FILE_SORT_REGEX_KEY
bool is_dict_encoded_string() const
#define CHUNK_KEY_COLUMN_IDX
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
bool isRestored() const override
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
size_t g_max_import_threads