20 #include <condition_variable>
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
35 namespace foreign_storage {
38 , foreign_table_(nullptr)
39 , user_mapping_(nullptr)
40 , disable_cache_(
false) {}
46 , foreign_table_(foreign_table)
48 , user_mapping_(nullptr)
49 , disable_cache_(
false) {}
55 const bool disable_cache)
57 , foreign_table_(foreign_table)
59 , user_mapping_(user_mapping)
60 , disable_cache_(disable_cache) {}
65 const int32_t table_id,
66 const int fragment_id) {
67 CHECK(!buffers.empty());
68 std::set<const ColumnDescriptor*> columns;
69 for (
const auto& entry : buffers) {
73 columns.emplace(column);
84 const std::set<const ColumnDescriptor*>& columns,
85 const int fragment_id,
87 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
88 for (
const auto column : columns) {
94 column_id_to_chunk_map[column->columnId]);
105 CHECK(!required_buffers.empty());
108 auto required_columns =
110 std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
112 required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
114 if (!optional_buffers.empty()) {
115 auto optional_columns =
118 optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
120 populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
126 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
130 for (
auto& entry : column_id_to_chunk_map) {
136 if (column->columnType.is_varlen_indeed()) {
137 data_chunk_key.emplace_back(1);
142 auto cached_metadata_previous =
145 std::make_shared<ChunkMetadata>();
147 *cached_metadata = *cached_metadata_previous;
148 auto chunk_metadata =
149 entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
150 cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
151 cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
152 cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
153 cached_metadata->numBytes = entry.second.getBuffer()->size();
178 const size_t start_index,
179 const size_t end_index,
182 const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
186 load_file_region_result.
file_offset = file_regions[start_index].first_row_file_offset;
187 load_file_region_result.row_count = 0;
190 for (
size_t i = start_index; i <= end_index; i++) {
193 file_regions[i].first_row_file_offset,
194 file_regions[i].region_size);
195 CHECK_EQ(file_regions[i].region_size, read_size);
197 parse_file_request.
end_pos = file_regions[i].region_size;
199 parse_file_request.
file_offset = file_regions[i].first_row_file_offset;
202 result = parser.
parseBuffer(parse_file_request, i == end_index);
204 for (
const auto& rejected_row_index : result.
rejected_rows) {
205 load_file_region_result.rejected_row_indices.insert(
206 load_file_region_result.row_count + rejected_row_index);
208 load_file_region_result.row_count += result.
row_count;
210 load_file_region_result.column_id_to_data_blocks_map =
212 return load_file_region_result;
219 const bool size_known,
222 if (size_known && file_size < buffer_size) {
223 buffer_size = file_size + 1;
229 size_t buffer_size = 0;
230 for (
const auto& file_region : file_regions) {
231 buffer_size = std::max(buffer_size, file_region.region_size);
242 const bool size_known,
244 const size_t buffer_size) {
245 size_t thread_count = copy_params.
threads;
246 if (thread_count == 0) {
247 thread_count = std::thread::hardware_concurrency();
249 if (size_known && file_size > 0) {
250 size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
251 if (num_buffers_in_file < thread_count) {
252 thread_count = num_buffers_in_file;
255 CHECK_GT(thread_count, static_cast<size_t>(0));
261 size_t thread_count = copy_params.
threads;
262 if (thread_count == 0) {
264 std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
266 CHECK_GT(thread_count, static_cast<size_t>(0));
271 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
276 CHECK(!column_id_to_chunk_map.empty());
278 CHECK(!file_regions.empty());
283 const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
285 std::vector<ParseBufferRequest> parse_file_requests{};
286 parse_file_requests.reserve(thread_count);
287 std::vector<std::future<ParseFileRegionResult>> futures{};
288 std::set<int> column_filter_set;
289 for (
const auto& pair : column_id_to_chunk_map) {
290 column_filter_set.insert(pair.first);
293 std::vector<std::unique_ptr<FileReader>> file_readers;
294 rapidjson::Value reader_metadata(rapidjson::kObjectType);
295 rapidjson::Document d;
297 file_reader_->serialize(reader_metadata, d.GetAllocator());
301 for (
size_t i = 0; i < file_regions.size(); i += batch_size) {
302 parse_file_requests.emplace_back(buffer_size,
309 delete_buffer !=
nullptr);
310 auto start_index = i;
312 std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
315 file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
316 file_path, copy_params, reader_metadata));
323 std::ref(file_regions),
326 std::ref(*(file_readers.back())),
327 std::ref(parse_file_requests.back()),
328 std::ref(column_id_to_chunk_map),
332 for (
auto& future : futures) {
336 std::vector<ParseFileRegionResult> load_file_region_results{};
337 for (
auto& future : futures) {
338 load_file_region_results.emplace_back(future.get());
341 std::set<size_t> chunk_rejected_row_indices;
342 size_t chunk_offset = 0;
343 for (
auto result : load_file_region_results) {
344 for (
auto& [column_id, chunk] : column_id_to_chunk_map) {
346 result.column_id_to_data_blocks_map[column_id],
result.row_count, 0);
348 for (
const auto& rejected_row_index :
result.rejected_row_indices) {
349 chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
351 chunk_offset +=
result.row_count;
355 auto chunk_element_count = chunk_offset;
356 delete_buffer->
reserve(chunk_element_count);
357 for (
size_t i = 0; i < chunk_element_count; ++i) {
358 if (chunk_rejected_row_indices.find(i) != chunk_rejected_row_indices.end()) {
359 int8_t true_byte =
true;
360 delete_buffer->
append(&true_byte, 1);
362 int8_t false_byte =
false;
363 delete_buffer->
append(&false_byte, 1);
376 const size_t max_fragment_size,
377 const size_t buffer_row_count) {
378 CHECK(buffer_row_count > 0);
379 std::vector<size_t> partitions{};
380 size_t remaining_rows_in_last_fragment;
381 if (start_row_index % max_fragment_size == 0) {
382 remaining_rows_in_last_fragment = 0;
384 remaining_rows_in_last_fragment =
385 max_fragment_size - (start_row_index % max_fragment_size);
387 if (buffer_row_count <= remaining_rows_in_last_fragment) {
388 partitions.emplace_back(buffer_row_count);
390 if (remaining_rows_in_last_fragment > 0) {
391 partitions.emplace_back(remaining_rows_in_last_fragment);
393 size_t remaining_buffer_row_count =
394 buffer_row_count - remaining_rows_in_last_fragment;
395 while (remaining_buffer_row_count > 0) {
396 partitions.emplace_back(
397 std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
398 remaining_buffer_row_count -= partitions.back();
429 std::unique_lock<std::mutex> pending_requests_lock(
432 pending_requests_lock, [&multi_threading_params] {
441 pending_requests_lock.unlock();
443 return std::move(request);
452 size_t first_row_index,
454 const std::string& file_path) {
455 fragment_id_to_file_regions_map[fragment_id].emplace_back(
470 const size_t row_count) {
481 std::shared_ptr<Catalog_Namespace::Catalog>& catalog,
482 const bool disable_cache) {
483 if (!disable_cache && catalog->getDataMgr()
484 .getPersistentStorageMgr()
485 ->getDiskCacheConfig()
486 .isEnabledForFSI()) {
487 return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
502 bool disable_cache) {
521 if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
523 cached_chunks[chunk_key].setBuffer(
524 cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
526 cached_chunks[chunk_key].setIndexBuffer(
527 cache->getChunkBufferForPrecaching(index_key, is_first_block));
529 if (is_first_block) {
530 cached_chunks[chunk_key].initEncoder();
533 cached_chunks[chunk_key].appendData(data_block, row_count, 0);
546 std::map<int, const ColumnDescriptor*>& column_by_id,
547 std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
558 const auto column = column_by_id[column_id];
559 if (column->columnType.is_varlen_indeed()) {
560 chunk_key.emplace_back(1);
565 std::make_unique<ForeignStorageBuffer>();
597 std::unique_lock<std::mutex> completed_requests_queue_lock(
599 multi_threading_params.
request_pool.emplace(std::move(request));
600 completed_requests_queue_lock.unlock();
609 std::map<int, FileRegions>& fragment_id_to_file_regions_map,
611 std::map<int, const ColumnDescriptor*> column_by_id{};
614 if (!request_opt.has_value()) {
617 auto& request = request_opt.value();
619 if (column_by_id.empty()) {
620 for (
const auto column : request.getColumns()) {
621 column_by_id[column->columnId] = column;
625 request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
626 request.begin_pos = 0;
627 size_t row_index = request.first_row_index;
628 for (
const auto partition : partitions) {
629 request.process_row_count = partition;
630 for (
const auto& import_buffer : request.import_buffers) {
631 if (import_buffer !=
nullptr) {
632 import_buffer->clear();
636 int fragment_id = row_index / request.getMaxFragRows();
642 fragment_id_to_file_regions_map);
643 row_index +=
result.row_count;
644 request.begin_pos =
result.row_offsets.back() - request.file_offset;
649 std::lock_guard<std::mutex> pending_requests_lock(
665 std::unique_lock<std::mutex> request_pool_lock(
669 [&multi_threading_params] {
return !multi_threading_params.
request_pool.empty(); });
670 auto request = std::move(multi_threading_params.
request_pool.front());
672 request_pool_lock.unlock();
673 CHECK(request.buffer);
685 std::unique_lock<std::mutex> pending_requests_lock(
698 const size_t alloc_size) {
700 if (buffer_size < alloc_size) {
701 buffer = std::make_unique<char[]>(alloc_size);
702 buffer_size = alloc_size;
711 const size_t& buffer_size,
712 const std::string& file_path,
716 size_t& first_row_index_in_buffer,
717 size_t& current_file_offset,
719 auto alloc_size = buffer_size;
720 auto residual_buffer = std::make_unique<char[]>(alloc_size);
721 size_t residual_buffer_size = 0;
722 size_t residual_buffer_alloc_size = alloc_size;
726 std::lock_guard<std::mutex> pending_requests_lock(
735 if (residual_buffer_size > 0) {
736 memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
738 size_t size = residual_buffer_size;
739 size += file_reader.
read(request.buffer.get() + residual_buffer_size,
740 alloc_size - residual_buffer_size);
746 }
else if (size == 1 && request.buffer[0] == copy_params.
line_delim) {
749 current_file_offset++;
752 unsigned int num_rows_in_buffer = 0;
757 first_row_index_in_buffer,
760 request.buffer_size = size;
761 request.buffer_alloc_size = alloc_size;
762 request.first_row_index = first_row_index_in_buffer;
763 request.file_offset = current_file_offset;
764 request.buffer_row_count = num_rows_in_buffer;
766 residual_buffer_size = size - request.end_pos;
767 if (residual_buffer_size > 0) {
769 memcpy(residual_buffer.get(),
770 request.buffer.get() + request.end_pos,
771 residual_buffer_size);
774 current_file_offset += request.end_pos;
775 first_row_index_in_buffer += num_rows_in_buffer;
780 std::unique_lock<std::mutex> pending_requests_queue_lock(
783 pending_requests_queue_lock, [&multi_threading_params] {
788 pending_requests_queue_lock.unlock();
801 const size_t start_row,
802 const size_t total_num_rows,
803 std::map<
ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
806 chunk_key.emplace_back(1);
810 int start_fragment = start_row / foreign_table->
maxFragRows;
812 if (total_num_rows > 0) {
813 end_fragment = (total_num_rows - 1) / foreign_table->
maxFragRows;
815 for (
int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
816 size_t num_elements = (
static_cast<size_t>(foreign_table->
maxFragRows *
817 (fragment_id + 1)) > total_num_rows)
880 std::map<int32_t, const ColumnDescriptor*> column_by_id{};
881 for (
auto column : columns) {
882 column_by_id[column->columnId] = column;
892 std::set<int> columns_to_scan;
893 for (
auto column : columns) {
895 columns_to_scan.insert(column->columnId);
911 std::vector<std::future<void>> futures{};
912 for (
size_t i = 0; i < thread_count; i++) {
913 multi_threading_params.
request_pool.emplace(buffer_size,
928 std::ref(multi_threading_params),
938 multi_threading_params,
944 std::unique_lock<std::mutex> pending_requests_lock(
952 for (
auto& future : futures) {
960 CHECK(column_entry != column_by_id.end());
961 const auto& column_type = column_entry->second->columnType;
962 auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
963 chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
964 const auto& cached_chunks = multi_threading_params.
cached_chunks;
965 if (!column_type.is_varlen_indeed()) {
966 chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
967 }
else if (
auto chunk_entry = cached_chunks.find(chunk_key);
968 chunk_entry != cached_chunks.end()) {
969 auto buffer = chunk_entry->second.getBuffer();
971 chunk_metadata->numBytes = buffer->size();
973 CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
978 for (
auto column : columns) {
986 chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
996 rapidjson::Document d;
1002 "fragment_id_to_file_regions_map",
1006 rapidjson::Value reader_metadata(rapidjson::kObjectType);
1007 file_reader_->serialize(reader_metadata, d.GetAllocator());
1008 d.AddMember(
"reader_metadata", reader_metadata, d.GetAllocator());
1018 const std::string& file_path,
1021 CHECK(d.IsObject());
1028 CHECK(d.HasMember(
"reader_metadata"));
1034 full_file_path, copy_params, d[
"reader_metadata"]);
1046 for (
auto& pair : chunk_metadata) {
1055 pair.second->numElements);
1057 pair.second->chunkStats);
1081 for (
auto const& column : columns) {
1084 .try_emplace(column->columnId,
1085 std::make_unique<import_export::RenderGroupAnalyzer>())
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams ©_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t ¤t_file_offset, const TextFileBufferParser &parser)
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool is_last_block, bool disable_cache)
RenderGroupAnalyzerMap render_group_analyzer_map_
std::vector< int > ChunkKey
static const std::string REGEX_PATH_FILTER_KEY
virtual size_t read(void *buffer, size_t max_size)=0
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
class for a per-database catalog. also includes metadata for the current database and the current use...
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
std::set< size_t > rejected_rows
std::vector< ArrayDatum > * arraysPtr
static const std::string LOCAL_FILE_STORAGE_TYPE
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
#define CHUNK_KEY_FRAGMENT_IDX
bool skip_metadata_scan(const ColumnDescriptor *column)
std::string getSerializedDataWrapper() const override
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
virtual const TextFileBufferParser & getFileBufferParser() const =0
size_t append_start_offset_
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
const bool disable_cache_
std::set< size_t > rejected_row_indices
std::optional< ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
static const std::string FILE_SORT_ORDER_BY_KEY
std::vector< FileRegion > FileRegions
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
bool key_does_not_shard_to_leaf(const ChunkKey &key)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
rapidjson::Document read_from_file(const std::string &file_path)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
std::unique_ptr< FileReader > file_reader_
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
bool is_dict_encoded_type() const
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
specifies the content in-memory of a row in the column metadata table
virtual bool isScanFinished()=0
std::optional< std::string > getOption(const std::string_view &key) const
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
bool isAppendMode() const
Checks if the table is in append mode.
std::vector< size_t > row_offsets
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
const ForeignTable * foreign_table_
size_t get_buffer_size(const import_export::CopyParams ©_params, const bool size_known, const size_t file_size)
ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
size_t get_thread_count(const import_export::CopyParams ©_params, const bool size_known, const size_t file_size, const size_t buffer_size)
bool isRestored() const override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
std::map< int, FileRegions > fragment_id_to_file_regions_map_
const ForeignServer * foreign_server
virtual size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
bool g_enable_watchdog false
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const ParseBufferResult &result, const std::string &file_path)
#define DEBUG_TIMER(name)
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
std::string write_to_string(const rapidjson::Document &document)
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
int32_t getTableId() const
static const std::string FILE_SORT_REGEX_KEY
virtual ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const =0
bool is_varlen_indeed() const
#define CHUNK_KEY_COLUMN_IDX
size_t getMaxFragRows() const
std::string getFilePath() const
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
virtual void reserve(size_t num_bytes)=0
size_t file_size(const int fd)
std::unique_ptr< char[]> buffer
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
AbstractTextFileDataWrapper()