21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
31 namespace foreign_storage {
35 std::shared_ptr<ChunkMetadata> reduce_from) {
36 CHECK(reduce_to->sqlType == reduce_from->sqlType);
37 reduce_to->numBytes += reduce_from->numBytes;
38 reduce_to->numElements += reduce_from->numElements;
39 reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
41 auto column_type = reduce_to->sqlType;
42 column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
46 if (column_type.is_string() || column_type.is_geometry()) {
60 encoder_to->reduceStats(*encoder_from);
61 auto updated_metadata = std::make_shared<ChunkMetadata>();
62 encoder_to->getMetadata(updated_metadata);
63 reduce_to->chunkStats = updated_metadata->chunkStats;
72 , foreign_table_(foreign_table)
73 , last_fragment_index_(0)
74 , last_fragment_row_count_(0)
82 file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
103 const auto& columns =
schema_->getLogicalAndPhysicalColumns();
104 auto column_start = column_interval.
start;
105 auto column_end = column_interval.
end;
106 std::list<const ColumnDescriptor*> columns_to_init;
107 for (
const auto column : columns) {
108 auto column_id = column->columnId;
109 if (column_id >= column_start && column_id <= column_end) {
110 columns_to_init.push_back(column);
113 return columns_to_init;
117 const int fragment_index,
119 std::map<ChunkKey, AbstractBuffer*>& required_buffers,
120 const bool reserve_buffers_and_set_stats) {
124 if (column->columnType.is_varlen_indeed()) {
127 auto data_buffer = required_buffers[data_chunk_key];
129 chunk.setBuffer(data_buffer);
133 auto index_buffer = required_buffers[index_chunk_key];
135 chunk.setIndexBuffer(index_buffer);
139 auto data_buffer = required_buffers[data_chunk_key];
141 chunk.setBuffer(data_buffer);
144 if (reserve_buffers_and_set_stats) {
147 auto buffer = chunk.getBuffer();
148 auto& metadata = metadata_it->second;
149 auto encoder = buffer->getEncoder();
150 encoder->resetChunkStats(metadata->chunkStats);
151 encoder->setNumElems(metadata->numElements);
152 if (column->columnType.is_string() &&
154 auto index_buffer = chunk.getIndexBuf();
155 index_buffer->reserve(
sizeof(
StringOffsetT) * (metadata->numElements + 1));
156 }
else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
157 auto index_buffer = chunk.getIndexBuf();
158 index_buffer->reserve(
sizeof(
ArrayOffsetT) * (metadata->numElements + 1));
160 size_t num_bytes_to_reserve =
161 metadata->numElements * column->columnType.get_size();
162 buffer->reserve(num_bytes_to_reserve);
174 const auto last_fragment_entry =
186 const auto last_fragment_entry =
191 if (last_fragment_entry->second.empty()) {
195 return (last_fragment_entry->second.back().file_path != file_path);
200 const auto last_fragment_entry =
205 if (last_fragment_entry->second.empty()) {
216 std::set<std::string> new_file_paths;
220 for (
const auto& file_path : processed_file_paths) {
221 if (all_file_paths.find(file_path) == all_file_paths.end()) {
226 for (
const auto& file_path : all_file_paths) {
227 if (processed_file_paths.find(file_path) == processed_file_paths.end()) {
228 new_file_paths.emplace(file_path);
236 if (new_file_paths.empty() && all_file_paths.size() == 1) {
237 CHECK_EQ(processed_file_paths.size(),
static_cast<size_t>(1));
238 const auto& file_path = *all_file_paths.begin();
239 CHECK_EQ(*processed_file_paths.begin(), file_path);
244 auto& reader = file_reader_cache_->at(file_path);
245 size_t row_count = reader->parquet_reader()->metadata()->num_rows();
250 new_file_paths = all_file_paths;
261 if (!new_file_paths.empty()) {
267 std::set<std::string> file_paths;
269 for (
const auto& row_group_interval : entry.second) {
270 file_paths.emplace(row_group_interval.file_path);
278 std::set<std::string> file_paths;
279 arrow::fs::FileSelector file_selector{};
281 file_selector.base_dir = base_path;
282 file_selector.recursive =
true;
284 auto file_info_result =
file_system_->GetFileInfo(file_selector);
285 if (!file_info_result.ok()) {
287 file_paths.emplace(base_path);
289 auto& file_info_vector = file_info_result.ValueOrDie();
290 for (
const auto& file_info : file_info_vector) {
291 if (file_info.type() == arrow::fs::FileType::File) {
292 file_paths.emplace(file_info.path());
295 if (file_paths.empty()) {
296 throw std::runtime_error{
"No file found at given path \"" + base_path +
"\"."};
305 auto column_interval =
307 schema_->getLogicalAndPhysicalColumns().back()->columnId};
309 for (
const auto& row_group_metadata_item : row_group_metadata) {
310 const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
311 CHECK(static_cast<int>(column_chunk_metadata.size()) ==
312 schema_->numLogicalAndPhysicalColumns());
313 auto column_chunk_metadata_iter = column_chunk_metadata.begin();
314 const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
315 int row_group = row_group_metadata_item.row_group_index;
316 const auto& file_path = row_group_metadata_item.file_path;
325 for (
int column_id = column_interval.start; column_id <= column_interval.end;
326 column_id++, column_chunk_metadata_iter++) {
327 CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
328 const auto column_descriptor =
schema_->getColumnDescriptor(column_id);
330 const auto& type_info = column_descriptor->columnType;
333 ChunkKey data_chunk_key = chunk_key;
334 if (type_info.is_varlen_indeed()) {
335 data_chunk_key.emplace_back(1);
337 std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
359 chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
364 const int logical_column_id,
365 const int fragment_id,
366 std::map<ChunkKey, AbstractBuffer*>& required_buffers) {
370 schema_->getColumnDescriptor(logical_column_id);
371 auto parquet_column_index =
schema_->getParquetColumnIndex(logical_column_id);
380 const bool is_dictionary_encoded_string_column =
386 if (is_dictionary_encoded_string_column) {
387 auto dict_descriptor = catalog->getMetadataForDictUnlocked(
389 CHECK(dict_descriptor);
390 string_dictionary = dict_descriptor->stringDict.get();
393 std::list<Chunk_NS::Chunk> chunks;
394 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
396 auto column_descriptor =
schema_->getColumnDescriptor(column_id);
398 if (column_descriptor->columnType.is_varlen_indeed()) {
401 auto buffer = required_buffers[data_chunk_key];
403 chunk.setBuffer(buffer);
406 auto index_buffer = required_buffers[index_chunk_key];
408 chunk.setIndexBuffer(index_buffer);
411 auto buffer = required_buffers[chunk_key];
413 chunk.setBuffer(buffer);
415 chunks.emplace_back(chunk);
420 row_group_intervals, parquet_column_index, chunks, string_dictionary);
423 auto metadata_iter = metadata.begin();
424 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
425 ++column_id, ++metadata_iter) {
426 auto column =
schema_->getColumnDescriptor(column_id);
428 if (column->columnType.is_varlen_indeed()) {
429 data_chunk_key.emplace_back(1);
433 auto updated_metadata = std::make_shared<ChunkMetadata>();
434 *updated_metadata = *cached_metadata;
436 if (is_dictionary_encoded_string_column ||
438 CHECK(metadata_iter != metadata.end());
439 auto& chunk_metadata_ptr = *metadata_iter;
440 updated_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
441 updated_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
443 CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
444 updated_metadata->numBytes = required_buffers[data_chunk_key]->size();
445 fragmenter->updateColumnChunkMetadata(column, fragment_id, updated_metadata);
451 std::map<ChunkKey, AbstractBuffer*>& required_buffers,
452 std::map<ChunkKey, AbstractBuffer*>& optional_buffers) {
453 CHECK(!required_buffers.empty());
456 std::set<int> logical_column_ids;
457 for (
const auto& [chunk_key, buffer] : required_buffers) {
459 CHECK_EQ(buffer->size(),
static_cast<size_t>(0));
460 const auto column_id =
462 logical_column_ids.emplace(column_id);
465 for (
const auto column_id : logical_column_ids) {
472 rapidjson::Document::AllocatorType& allocator) {
473 json_val.SetObject();
480 CHECK(json_val.IsObject());
487 const std::string& file_path)
const {
488 rapidjson::Document d;
493 "fragment_to_row_group_interval_map",
507 const std::string& file_path,
521 for (
const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
size_t last_fragment_row_count_
void finalizeFragmentMap()
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, std::map< ChunkKey, AbstractBuffer * > &required_buffers)
void populateChunkBuffers(std::map< ChunkKey, AbstractBuffer * > &required_buffers, std::map< ChunkKey, AbstractBuffer * > &optional_buffers) override
std::vector< int > ChunkKey
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
std::set< std::string > getAllFilePaths()
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
static const std::string LOCAL_FILE_STORAGE_TYPE
std::unique_ptr< ForeignTableSchema > schema_
void serializeDataWrapperInternals(const std::string &file_path) const override
#define CHUNK_KEY_FRAGMENT_IDX
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
std::map< const std::string, ReaderPtr > FileReaderMap
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
void throw_removed_row_error(const std::string &file_path)
void throw_removed_file_error(const std::string &file_path)
void resetParquetMetadata()
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
void fetchChunkMetadata()
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
int get_physical_cols() const
const ForeignTable * foreign_table_
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
rapidjson::Document read_from_file(const std::string &file_path)
void metadataScanFiles(const std::set< std::string > &file_paths)
void addNewFragment(int row_group, const std::string &file_path)
ReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
void write_to_file(const rapidjson::Document &document, const std::string &filepath)
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
HOST DEVICE int get_comp_param() const
const ForeignServer * foreign_server
Encoder * getEncoder() const
bool g_enable_watchdog false
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, std::map< ChunkKey, AbstractBuffer * > &required_buffers, const bool reserve_buffers_and_set_stats=false)
#define DEBUG_TIMER(name)
bool is_dict_encoded_string() const
#define CHUNK_KEY_COLUMN_IDX
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
bool isRestored() const override
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
std::list< RowGroupMetadata > metadataScan(const std::set< std::string > &file_paths, const ForeignTableSchema &schema)
Perform a metadata scan for the paths specified.