20 #include <condition_variable>
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
37 namespace foreign_storage {
41 : db_id_(db_id), foreign_table_(foreign_table), is_restored_(
false) {}
49 return supported_table_options;
53 std::set<std::string_view> supported_table_options(
57 return supported_table_options;
62 const std::map<ChunkKey, AbstractBuffer*>& buffers,
63 std::shared_ptr<Catalog_Namespace::Catalog> catalog,
64 const int32_t table_id,
65 const int fragment_id) {
66 CHECK(!buffers.empty());
67 std::set<const ColumnDescriptor*> columns;
68 for (
const auto& entry : buffers) {
71 const auto column = catalog->getMetadataForColumnUnlocked(table_id, column_id);
72 columns.emplace(column);
85 const std::set<const ColumnDescriptor*>& columns,
86 const int fragment_id,
87 const std::map<ChunkKey, AbstractBuffer*>& buffers,
88 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
89 for (
const auto column : columns) {
93 if (column->columnType.is_varlen_indeed()) {
99 CHECK(buffers.find(data_chunk_key) != buffers.end());
100 CHECK(buffers.find(index_chunk_key) != buffers.end());
102 data_buffer = buffers.find(data_chunk_key)->second;
103 index_buffer = buffers.find(index_chunk_key)->second;
104 CHECK_EQ(data_buffer->size(),
static_cast<size_t>(0));
107 size_t index_offset_size{0};
108 if (column->columnType.is_string() || column->columnType.is_geometry()) {
110 }
else if (column->columnType.is_array()) {
116 index_buffer->
reserve(index_offset_size *
120 CHECK(buffers.find(data_chunk_key) != buffers.end());
121 data_buffer = buffers.find(data_chunk_key)->second;
125 column_id_to_chunk_map[column->columnId].
setBuffer(data_buffer);
126 column_id_to_chunk_map[column->columnId].setIndexBuffer(index_buffer);
127 column_id_to_chunk_map[column->columnId].initEncoder();
132 std::map<ChunkKey, AbstractBuffer*>& required_buffers,
133 std::map<ChunkKey, AbstractBuffer*>& optional_buffers) {
137 CHECK(!required_buffers.empty());
140 std::set<const ColumnDescriptor*> required_columns =
142 std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
144 required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
146 if (!optional_buffers.empty()) {
147 std::set<const ColumnDescriptor*> optional_columns;
151 optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
155 for (
auto& entry : column_id_to_chunk_map) {
156 entry.second.setBuffer(
nullptr);
157 entry.second.setIndexBuffer(
nullptr);
163 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
169 for (
auto& entry : column_id_to_chunk_map) {
175 if (column->columnType.is_varlen_indeed()) {
176 data_chunk_key.emplace_back(1);
180 auto chunk_metadata =
181 entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
182 cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
183 cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
184 cached_metadata->numBytes = entry.second.getBuffer()->size();
185 fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
211 const size_t start_index,
212 const size_t end_index,
215 const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
217 load_file_region_result.
file_offset = file_regions[start_index].first_row_file_offset;
218 load_file_region_result.row_count = 0;
221 for (
size_t i = start_index;
i <= end_index;
i++) {
226 file_regions[
i].first_row_file_offset,
227 file_regions[
i].region_size);
230 CHECK_EQ(file_regions[
i].region_size, read_size);
232 parse_file_request.
end_pos = file_regions[
i].region_size;
234 parse_file_request.
file_offset = file_regions[
i].first_row_file_offset;
239 load_file_region_result.row_count += result.
row_count;
241 load_file_region_result.column_id_to_data_blocks_map =
243 return load_file_region_result;
250 const bool size_known,
253 if (size_known && file_size < buffer_size) {
254 buffer_size = file_size + 1;
260 size_t buffer_size = 0;
261 for (
const auto& file_region : file_regions) {
262 buffer_size = std::max(buffer_size, file_region.region_size);
273 const bool size_known,
275 const size_t buffer_size) {
276 size_t thread_count = copy_params.
threads;
277 if (thread_count == 0) {
278 thread_count = std::thread::hardware_concurrency();
281 size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
282 if (num_buffers_in_file < thread_count) {
283 thread_count = num_buffers_in_file;
292 size_t thread_count = copy_params.
threads;
293 if (thread_count == 0) {
295 std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
302 std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
306 CHECK(!column_id_to_chunk_map.empty());
308 CHECK(!file_regions.empty());
313 const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
315 std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
316 parse_file_requests.reserve(thread_count);
317 std::vector<std::future<ParseFileRegionResult>> futures{};
318 std::set<int> column_filter_set;
319 for (
const auto& pair : column_id_to_chunk_map) {
320 column_filter_set.insert(pair.first);
323 std::vector<std::unique_ptr<CsvReader>> csv_readers;
324 rapidjson::Value reader_metadata(rapidjson::kObjectType);
325 rapidjson::Document d;
327 csv_reader_->serialize(reader_metadata, d.GetAllocator());
330 for (
size_t i = 0;
i < file_regions.size();
i += batch_size) {
331 parse_file_requests.emplace_back(buffer_size,
337 auto start_index =
i;
339 std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
342 csv_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
343 csv_file_path, copy_params, reader_metadata));
348 futures.emplace_back(std::async(std::launch::async,
350 std::ref(file_regions),
353 std::ref(*(csv_readers.back())),
354 std::ref(parse_file_requests.back()),
355 std::ref(column_id_to_chunk_map)));
358 std::vector<ParseFileRegionResult> load_file_region_results{};
359 for (
auto& future : futures) {
361 load_file_region_results.emplace_back(future.get());
364 for (
auto result : load_file_region_results) {
365 for (
auto& [column_id, chunk] : column_id_to_chunk_map) {
367 result.column_id_to_data_blocks_map[column_id],
result.row_count, 0);
379 const size_t max_fragment_size,
380 const size_t buffer_row_count) {
381 CHECK(buffer_row_count > 0);
382 std::vector<size_t> partitions{};
383 size_t remaining_rows_in_last_fragment;
384 if (start_row_index % max_fragment_size == 0) {
385 remaining_rows_in_last_fragment = 0;
387 remaining_rows_in_last_fragment =
388 max_fragment_size - (start_row_index % max_fragment_size);
390 if (buffer_row_count <= remaining_rows_in_last_fragment) {
391 partitions.emplace_back(buffer_row_count);
393 if (remaining_rows_in_last_fragment > 0) {
394 partitions.emplace_back(remaining_rows_in_last_fragment);
396 size_t remaining_buffer_row_count =
397 buffer_row_count - remaining_rows_in_last_fragment;
398 while (remaining_buffer_row_count > 0) {
399 partitions.emplace_back(
400 std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
401 remaining_buffer_row_count -= partitions.back();
433 std::unique_lock<std::mutex> pending_requests_lock(
436 pending_requests_lock, [&multi_threading_params] {
445 pending_requests_lock.unlock();
447 return std::move(request);
456 size_t first_row_index,
458 const std::string& file_path) {
459 fragment_id_to_file_regions_map[fragment_id].emplace_back(
474 size_t byte_count = 0;
476 for (
const auto& str : *data_block.
stringsPtr) {
477 byte_count += str.length();
479 }
else if (sql_type_info.
is_array()) {
480 for (
const auto& array : *data_block.
arraysPtr) {
481 byte_count += array.length;
496 const size_t row_count) {
507 std::shared_ptr<Catalog_Namespace::Catalog>& catalog) {
508 if (catalog->getDataMgr()
509 .getPersistentStorageMgr()
510 ->getDiskCacheConfig()
511 .isEnabledForFSI()) {
512 return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
526 bool is_last_block) {
538 if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
540 cached_chunks[chunk_key].setBuffer(
541 cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
543 cached_chunks[chunk_key].setIndexBuffer(
544 cache->getChunkBufferForPrecaching(index_key, is_first_block));
546 if (is_first_block) {
547 cached_chunks[chunk_key].initEncoder();
550 cached_chunks[chunk_key].appendData(data_block, row_count, 0);
554 std::vector<ChunkKey> key_to_cache{chunk_key};
556 key_to_cache.push_back(index_key);
558 cache->cacheTableChunks(key_to_cache);
572 std::map<int, const ColumnDescriptor*>& column_by_id,
573 std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
584 const auto column = column_by_id[column_id];
586 if (column->columnType.is_varlen_indeed()) {
587 chunk_key.emplace_back(1);
590 byte_count = column->columnType.get_size() * result.
row_count;
601 std::make_unique<ForeignStorageBuffer>();
633 std::unique_lock<std::mutex> completed_requests_queue_lock(
635 multi_threading_params.
request_pool.emplace(std::move(request));
636 completed_requests_queue_lock.unlock();
645 std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
646 std::map<int, const ColumnDescriptor*> column_by_id{};
649 if (!request_opt.has_value()) {
652 auto& request = request_opt.value();
654 if (column_by_id.empty()) {
655 for (
const auto column : request.getColumns()) {
656 column_by_id[column->columnId] = column;
660 request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
661 request.begin_pos = 0;
662 size_t row_index = request.first_row_index;
663 for (
const auto partition : partitions) {
664 request.process_row_count = partition;
665 for (
const auto& import_buffer : request.import_buffers) {
666 if (import_buffer !=
nullptr) {
667 import_buffer->clear();
671 int fragment_id = row_index / request.getMaxFragRows();
677 fragment_id_to_file_regions_map);
678 row_index +=
result.row_count;
679 request.begin_pos =
result.row_offsets.back() - request.file_offset;
684 std::lock_guard<std::mutex> pending_requests_lock(
700 std::unique_lock<std::mutex> request_pool_lock(
704 [&multi_threading_params] {
return !multi_threading_params.
request_pool.empty(); });
705 auto request = std::move(multi_threading_params.
request_pool.front());
707 request_pool_lock.unlock();
708 CHECK(request.buffer);
720 std::unique_lock<std::mutex> pending_requests_lock(
733 const size_t alloc_size) {
735 if (buffer_size < alloc_size) {
736 buffer = std::make_unique<char[]>(alloc_size);
737 buffer_size = alloc_size;
746 const size_t& buffer_size,
747 const std::string& file_path,
751 size_t& first_row_index_in_buffer,
752 size_t& current_file_offset) {
753 auto alloc_size = buffer_size;
754 auto residual_buffer = std::make_unique<char[]>(alloc_size);
755 size_t residual_buffer_size = 0;
756 size_t residual_buffer_alloc_size = alloc_size;
760 std::lock_guard<std::mutex> pending_requests_lock(
769 if (residual_buffer_size > 0) {
770 memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
772 size_t size = residual_buffer_size;
773 size += csv_reader.
read(request.buffer.get() + residual_buffer_size,
774 alloc_size - residual_buffer_size);
780 }
else if (size == 1 && request.buffer[0] == copy_params.
line_delim) {
783 current_file_offset++;
786 unsigned int num_rows_in_buffer = 0;
792 first_row_index_in_buffer,
796 request.buffer_size = size;
797 request.buffer_alloc_size = alloc_size;
798 request.first_row_index = first_row_index_in_buffer;
799 request.file_offset = current_file_offset;
800 request.buffer_row_count = num_rows_in_buffer;
802 residual_buffer_size = size - request.end_pos;
803 if (residual_buffer_size > 0) {
805 memcpy(residual_buffer.get(),
806 request.buffer.get() + request.end_pos,
807 residual_buffer_size);
810 current_file_offset += request.end_pos;
811 first_row_index_in_buffer += num_rows_in_buffer;
816 std::unique_lock<std::mutex> pending_requests_queue_lock(
819 pending_requests_queue_lock, [&multi_threading_params] {
824 pending_requests_queue_lock.unlock();
837 const size_t start_row,
838 const size_t total_num_rows,
839 std::map<
ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
842 chunk_key.emplace_back(1);
846 int start_fragment = start_row / foreign_table->
maxFragRows;
847 int end_fragment = total_num_rows / foreign_table->
maxFragRows;
848 for (
int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
849 size_t num_elements = (
static_cast<size_t>(foreign_table->
maxFragRows *
850 (fragment_id + 1)) > total_num_rows)
857 auto chunk_metadata = empty_buffer.getEncoder()->getMetadata(column->
columnType);
858 chunk_metadata->numElements = num_elements;
860 chunk_metadata->chunkStats.min.intval = std::numeric_limits<int32_t>::max();
861 chunk_metadata->chunkStats.max.intval = std::numeric_limits<int32_t>::lowest();
864 chunk_metadata_map[chunk_key] = chunk_metadata;
900 csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
908 auto columns = catalog->getAllColumnMetadataForTableUnlocked(
910 std::map<int32_t, const ColumnDescriptor*> column_by_id{};
911 for (
auto column : columns) {
912 column_by_id[column->columnId] = column;
922 std::set<int> columns_to_scan;
923 for (
auto column : columns) {
925 columns_to_scan.insert(column->columnId);
940 std::vector<std::future<void>> futures{};
941 for (
size_t i = 0;
i < thread_count;
i++) {
942 multi_threading_params.
request_pool.emplace(buffer_size,
949 futures.emplace_back(std::async(std::launch::async,
951 std::ref(multi_threading_params),
960 multi_threading_params,
965 std::unique_lock<std::mutex> pending_requests_lock(
973 for (
auto& future : futures) {
980 auto chunk_metadata =
981 buffer->getEncoder()->getMetadata(column_by_id[chunk_key[2]]->columnType);
982 chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
983 chunk_metadata->numBytes = multi_threading_params.
chunk_byte_count[chunk_key];
987 for (
auto column : columns) {
995 chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1007 std::vector<ChunkKey> to_cache;
1008 for (
auto& [chunk_key, buffer] : multi_threading_params.
cached_chunks) {
1009 if (buffer.getBuffer()->getEncoder()->getNumElems() !=
1012 ->columnType.is_varlen_indeed()) {
1013 ChunkKey index_chunk_key = chunk_key;
1014 index_chunk_key[4] = 2;
1015 to_cache.push_back(chunk_key);
1016 to_cache.push_back(index_chunk_key);
1018 to_cache.push_back(chunk_key);
1022 if (to_cache.size() > 0) {
1023 cache->cacheTableChunks(to_cache);
1031 rapidjson::Document::AllocatorType& allocator) {
1032 json_val.SetObject();
1038 json_val, file_region.
region_size,
"region_size", allocator);
1040 json_val, file_region.
row_count,
"row_count", allocator);
1044 CHECK(json_val.IsObject());
1054 rapidjson::Document d;
1060 "fragment_id_to_file_regions_map",
1064 rapidjson::Value reader_metadata(rapidjson::kObjectType);
1065 csv_reader_->serialize(reader_metadata, d.GetAllocator());
1066 d.AddMember(
"reader_metadata", reader_metadata, d.GetAllocator());
1076 const std::string& file_path,
1079 CHECK(d.IsObject());
1086 CHECK(d.HasMember(
"reader_metadata"));
1091 csv_reader_ = std::make_unique<LocalMultiFileReader>(
1092 csv_file_path, copy_params, d[
"reader_metadata"]);
1104 for (
auto& pair : chunk_metadata) {
1113 pair.second->numElements);
1115 pair.second->chunkStats);
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool isRestored() const override
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
std::vector< int > ChunkKey
size_t get_var_length_data_block_size(DataBlockPtr data_block, SQLTypeInfo sql_type_info)
void populateChunkBuffers(std::map< ChunkKey, AbstractBuffer * > &required_buffers, std::map< ChunkKey, AbstractBuffer * > &optional_buffers) override
void serializeDataWrapperInternals(const std::string &file_path) const override
std::map< ChunkKey, size_t > chunk_byte_count_
static const std::set< std::string_view > csv_table_options_
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
std::vector< ArrayDatum > * arraysPtr
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool is_last_block)
static const std::string LOCAL_FILE_STORAGE_TYPE
ParseBufferResult parse_buffer(ParseBufferRequest &request, bool convert_data_blocks)
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog)
#define CHUNK_KEY_FRAGMENT_IDX
virtual size_t read(void *buffer, size_t max_size)=0
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
size_t first_row_file_offset
Constants for Builtin SQL Types supported by OmniSci.
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
std::unique_ptr< CsvReader > csv_reader_
std::set< std::string_view > getAllCsvTableOptions() const
void initEncoder(const SQLTypeInfo &tmp_sql_type)
void setBuffer(AbstractBuffer *b)
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
void validateTableOptions(const ForeignTable *foreign_table) const override
void validate_options(const ForeignTable *foreign_table)
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const csv_file_buffer_parser::ParseBufferResult &result, const std::string &file_path)
std::vector< FileRegion > FileRegions
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
rapidjson::Document read_from_file(const std::string &file_path)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
std::map< int, FileRegions > fragment_id_to_file_regions_map_
#define CHUNK_KEY_TABLE_IDX
std::vector< size_t > row_offsets
bool is_dict_encoded_type() const
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
int32_t getTableId() const
specifies the content in-memory of a row in the column metadata table
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
void write_to_file(const rapidjson::Document &document, const std::string &filepath)
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool isAppendMode() const
Checks if the table is in append mode.
const std::set< std::string_view > & getSupportedTableOptions() const override
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const std::map< ChunkKey, AbstractBuffer * > &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void validateTableOptions(const ForeignTable *foreign_table) const override
size_t get_buffer_size(const import_export::CopyParams ©_params, const bool size_known, const size_t file_size)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
csv_file_buffer_parser::ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
size_t get_thread_count(const import_export::CopyParams ©_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
virtual bool isScanFinished()=0
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const csv_file_buffer_parser::ParseBufferRequest &request, csv_file_buffer_parser::ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
const std::set< std::string_view > & getSupportedTableOptions() const override
size_t append_start_offset_
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
bool g_enable_watchdog false
#define DEBUG_TIMER(name)
std::optional< csv_file_buffer_parser::ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
bool is_varlen_indeed() const
std::unique_ptr< char[]> buffer
#define CHUNK_KEY_COLUMN_IDX
std::set< const ColumnDescriptor * > get_columns(const std::map< ChunkKey, AbstractBuffer * > &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
virtual void reserve(size_t num_bytes)=0
std::string getFilePath() const
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams ©_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
size_t getMaxFragRows() const
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams ©_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t ¤t_file_offset)
size_t file_size(const int fd)