OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AbstractTextFileDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 #include <queue>
23 
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
26 
32 #include "FsiJsonUtils.h"
34 #include "Shared/misc.h"
35 
36 namespace foreign_storage {
38  : db_id_(-1)
39  , foreign_table_(nullptr)
40  , user_mapping_(nullptr)
41  , disable_cache_(false)
42  , is_first_file_scan_call_(true)
43  , is_file_scan_in_progress_(false)
44  , iterative_scan_last_fragment_id_(-1) {}
45 
47  const int db_id,
48  const ForeignTable* foreign_table)
49  : db_id_(db_id)
50  , foreign_table_(foreign_table)
51  , is_restored_(false)
52  , user_mapping_(nullptr)
53  , disable_cache_(false)
54  , is_first_file_scan_call_(true)
55  , is_file_scan_in_progress_(false)
56  , iterative_scan_last_fragment_id_(-1) {}
57 
59  const int db_id,
60  const ForeignTable* foreign_table,
61  const UserMapping* user_mapping,
62  const bool disable_cache)
63  : db_id_(db_id)
64  , foreign_table_(foreign_table)
65  , is_restored_(false)
66  , user_mapping_(user_mapping)
67  , disable_cache_(disable_cache)
68  , is_first_file_scan_call_(true)
69  , is_file_scan_in_progress_(false)
70  , iterative_scan_last_fragment_id_(-1) {}
71 
72 namespace {
73 
75  const int32_t fragment_id,
76  const int32_t max_fragment_id) {
78  "Attempting to populate fragment id " + std::to_string(fragment_id) +
79  " for foreign table " + table->tableName +
80  " which is greater than the maximum fragment id of " +
81  std::to_string(max_fragment_id) + "."};
82 }
83 
84 std::set<const ColumnDescriptor*> get_columns(const ChunkToBufferMap& buffers,
85  const Catalog_Namespace::Catalog& catalog,
86  const int32_t table_id,
87  const int fragment_id) {
88  CHECK(!buffers.empty());
89  std::set<const ColumnDescriptor*> columns;
90  for (const auto& entry : buffers) {
91  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
92  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
93  const auto column = catalog.getMetadataForColumn(table_id, column_id);
94  columns.emplace(column);
95  }
96  return columns;
97 }
98 
99 bool skip_metadata_scan(const ColumnDescriptor* column) {
100  return column->columnType.is_dict_encoded_type();
101 }
102 } // namespace
103 
105  const std::set<const ColumnDescriptor*>& columns,
106  const int fragment_id,
107  const ChunkToBufferMap& buffers,
108  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
109  for (const auto column : columns) {
110  ChunkKey data_chunk_key = {
111  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
112  init_chunk_for_column(data_chunk_key,
114  buffers,
115  column_id_to_chunk_map[column->columnId]);
116  }
117 }
118 
120  const ChunkToBufferMap& required_buffers,
121  const ChunkToBufferMap& optional_buffers,
122  AbstractBuffer* delete_buffer) {
123  auto timer = DEBUG_TIMER(__func__);
125  CHECK(catalog);
126  CHECK(!required_buffers.empty());
127 
128  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
129  auto required_columns =
130  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
131  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
133  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
134 
135  if (!optional_buffers.empty()) {
136  auto optional_columns =
137  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
139  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
140  }
141  populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
143  updateMetadata(column_id_to_chunk_map, fragment_id);
144  }
145 }
146 
147 // if column was skipped during scan, update metadata now
149  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
150  int fragment_id) {
152  CHECK(catalog);
153  for (auto& entry : column_id_to_chunk_map) {
154  const auto& column =
155  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
156  if (skip_metadata_scan(column)) {
157  ChunkKey data_chunk_key = {
158  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
159  if (column->columnType.is_varlen_indeed()) {
160  data_chunk_key.emplace_back(1);
161  }
162  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
163  // Allocate new shared_ptr for metadata so we dont modify old one which may be
164  // used by executor
165  auto cached_metadata_previous =
166  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
167  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
168  std::make_shared<ChunkMetadata>();
169  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
170  *cached_metadata = *cached_metadata_previous;
171  auto chunk_metadata =
172  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
173  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
174  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
175  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
176  cached_metadata->numBytes = entry.second.getBuffer()->size();
177  }
178  }
179 }
180 
186  size_t file_offset;
187  size_t row_count;
188  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
189  std::set<size_t> rejected_row_indices;
190 
191  bool operator<(const ParseFileRegionResult& other) const {
192  return file_offset < other.file_offset;
193  }
194 };
195 
196 namespace {
197 void throw_unexpected_number_of_items(const size_t num_expected,
198  const size_t num_loaded,
199  const std::string& item_type,
200  const std::string& foreign_table_name) {
201  try {
203  num_expected, num_loaded, item_type);
204  } catch (const foreign_storage::ForeignStorageException& except) {
206  std::string(except.what()) + " Foreign table: " + foreign_table_name);
207  }
208 }
209 
210 } // namespace
211 
217  const FileRegions& file_regions,
218  const size_t start_index,
219  const size_t end_index,
220  FileReader& file_reader,
221  ParseBufferRequest& parse_file_request,
222  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
223  const TextFileBufferParser& parser) {
224  auto timer = DEBUG_TIMER(__func__);
225  ParseFileRegionResult load_file_region_result{};
226  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
227  load_file_region_result.row_count = 0;
228 
230  for (size_t i = start_index; i <= end_index; i++) {
231  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
232  auto read_size = file_reader.readRegion(parse_file_request.buffer.get(),
233  file_regions[i].first_row_file_offset,
234  file_regions[i].region_size);
235  if (file_regions[i].region_size != read_size) {
236  throw_unexpected_number_of_items(file_regions[i].region_size,
237  read_size,
238  "bytes",
239  parse_file_request.getTableName());
240  }
241  parse_file_request.begin_pos = 0;
242  parse_file_request.end_pos = file_regions[i].region_size;
243  parse_file_request.first_row_index = file_regions[i].first_row_index;
244  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
245  parse_file_request.process_row_count = file_regions[i].row_count;
246 
247  result = parser.parseBuffer(parse_file_request, i == end_index);
248  CHECK_EQ(file_regions[i].row_count, result.row_count);
249  for (const auto& rejected_row_index : result.rejected_rows) {
250  load_file_region_result.rejected_row_indices.insert(
251  load_file_region_result.row_count + rejected_row_index);
252  }
253  load_file_region_result.row_count += result.row_count;
254  }
255  load_file_region_result.column_id_to_data_blocks_map =
257  return load_file_region_result;
258 }
259 
260 namespace {
261 
265 size_t get_buffer_size(const import_export::CopyParams& copy_params,
266  const bool size_known,
267  const size_t file_size) {
268  size_t buffer_size = copy_params.buffer_size;
269  if (size_known && file_size < buffer_size) {
270  buffer_size = file_size + 1; // +1 for end of line character, if missing
271  }
272  return buffer_size;
273 }
274 
275 size_t get_buffer_size(const FileRegions& file_regions) {
276  size_t buffer_size = 0;
277  for (const auto& file_region : file_regions) {
278  buffer_size = std::max(buffer_size, file_region.region_size);
279  }
280  CHECK(buffer_size);
281  return buffer_size;
282 }
283 
288 size_t get_thread_count(const import_export::CopyParams& copy_params,
289  const bool size_known,
290  const size_t file_size,
291  const size_t buffer_size) {
292  size_t thread_count = copy_params.threads;
293  if (thread_count == 0) {
294  thread_count = std::thread::hardware_concurrency();
295  }
296  if (size_known && file_size > 0) {
297  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
298  if (num_buffers_in_file < thread_count) {
299  thread_count = num_buffers_in_file;
300  }
301  }
302  CHECK_GT(thread_count, static_cast<size_t>(0));
303  return thread_count;
304 }
305 
306 size_t get_thread_count(const import_export::CopyParams& copy_params,
307  const FileRegions& file_regions) {
308  size_t thread_count = copy_params.threads;
309  if (thread_count == 0) {
310  thread_count =
311  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
312  }
313  CHECK_GT(thread_count, static_cast<size_t>(0));
314  return thread_count;
315 }
316 
318  const size_t chunk_element_count) {
319  if (delete_buffer->size() < chunk_element_count) {
320  auto remaining_rows = chunk_element_count - delete_buffer->size();
321  std::vector<int8_t> data(remaining_rows, false);
322  delete_buffer->append(data.data(), remaining_rows);
323  }
324 }
325 
327  std::unique_lock<std::mutex> deferred_requests_lock(
328  multi_threading_params.deferred_requests_mutex);
329  return multi_threading_params.deferred_requests.empty();
330 }
331 
332 bool is_file_scan_finished(const FileReader* file_reader,
333  MetadataScanMultiThreadingParams& multi_threading_params) {
334  return file_reader->isScanFinished() && no_deferred_requests(multi_threading_params);
335 }
336 
337 } // namespace
338 
340  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
341  int fragment_id,
342  AbstractBuffer* delete_buffer) {
343  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
344 
345  CHECK(!column_id_to_chunk_map.empty());
346 
347  // check to see if a iterative scan step is required
348  auto file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
349  if (file_regions_it == fragment_id_to_file_regions_map_.end() ||
351  // check to see if there is more foreign data to scan
354  // NOTE: we can only guarantee the current `fragment_id` is fully done
355  // iterative scan if either
356  // 1) the scan is finished OR
357  // 2) `fragment_id+1` exists in the internal map
358  // this is why `fragment_id+1` is checked for below
359  auto file_regions_it_one_ahead =
360  fragment_id_to_file_regions_map_.find(fragment_id + 1);
362  (file_regions_it_one_ahead == fragment_id_to_file_regions_map_.end())) {
363  ChunkMetadataVector chunk_metadata_vector;
364  IterativeFileScanParameters iterative_params{
365  column_id_to_chunk_map, fragment_id, delete_buffer};
366  iterativeFileScan(chunk_metadata_vector, iterative_params);
367  }
368  }
369 
370  file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
371  if (file_regions_it == fragment_id_to_file_regions_map_.end()) {
373  is_file_scan_in_progress_ = false; // conclude the iterative scan is finished
375  true; // any subsequent iterative request can assume they will be the first
377  foreign_table_, fragment_id, fragment_id_to_file_regions_map_.rbegin()->first);
378  } else {
379  // iterative scan is required to have loaded all required chunks thus we
380  // can exit early
381  return;
382  }
383  }
384  CHECK(file_regions_it != fragment_id_to_file_regions_map_.end());
385 
386  const auto& file_regions = file_regions_it->second;
387 
388  // File roll off can lead to empty file regions.
389  if (file_regions.empty()) {
390  return;
391  }
392 
393  const auto buffer_size = get_buffer_size(file_regions);
394  const auto thread_count = get_thread_count(copy_params, file_regions);
395 
396  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
397 
398  std::vector<ParseBufferRequest> parse_file_requests{};
399  parse_file_requests.reserve(thread_count);
400  std::vector<std::future<ParseFileRegionResult>> futures{};
401  std::set<int> column_filter_set;
402  for (const auto& pair : column_id_to_chunk_map) {
403  column_filter_set.insert(pair.first);
404  }
405 
406  std::vector<std::unique_ptr<FileReader>> file_readers;
407  rapidjson::Value reader_metadata(rapidjson::kObjectType);
408  rapidjson::Document d;
409  auto& server_options = foreign_table_->foreign_server->options;
410  file_reader_->serialize(reader_metadata, d.GetAllocator());
411  const auto file_path = getFullFilePath(foreign_table_);
412  auto& parser = getFileBufferParser();
413 
414  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
415  parse_file_requests.emplace_back(buffer_size,
416  copy_params,
417  db_id_,
419  column_filter_set,
420  file_path,
422  delete_buffer != nullptr);
423  auto start_index = i;
424  auto end_index =
425  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
426 
427  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
428  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
429  file_path, copy_params, reader_metadata));
430  } else {
431  UNREACHABLE();
432  }
433 
434  futures.emplace_back(std::async(std::launch::async,
436  std::ref(file_regions),
437  start_index,
438  end_index,
439  std::ref(*(file_readers.back())),
440  std::ref(parse_file_requests.back()),
441  std::ref(column_id_to_chunk_map),
442  std::ref(parser)));
443  }
444 
445  for (auto& future : futures) {
446  future.wait();
447  }
448 
449  std::vector<ParseFileRegionResult> load_file_region_results{};
450  for (auto& future : futures) {
451  load_file_region_results.emplace_back(future.get());
452  }
453 
454  std::set<size_t> chunk_rejected_row_indices;
455  size_t chunk_offset = 0;
456  for (auto result : load_file_region_results) {
457  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
458  chunk.appendData(
459  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
460  }
461  for (const auto& rejected_row_index : result.rejected_row_indices) {
462  chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
463  }
464  chunk_offset += result.row_count;
465  }
466 
467  if (delete_buffer) {
468  // ensure delete buffer is sized appropriately
469  resize_delete_buffer(delete_buffer, chunk_offset);
470 
471  auto delete_buffer_data = delete_buffer->getMemoryPtr();
472  for (const auto rejected_row_index : chunk_rejected_row_indices) {
473  delete_buffer_data[rejected_row_index] = true;
474  }
475  }
476 }
477 
483 size_t num_rows_to_process(const size_t start_row_index,
484  const size_t max_fragment_size,
485  const size_t rows_remaining) {
486  size_t start_position_in_fragment = start_row_index % max_fragment_size;
487  return std::min<size_t>(rows_remaining, max_fragment_size - start_position_in_fragment);
488 }
489 
496 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
497  const size_t max_fragment_size,
498  const size_t buffer_row_count) {
499  CHECK(buffer_row_count > 0);
500  std::vector<size_t> partitions{};
501  size_t remaining_rows_in_last_fragment;
502  if (start_row_index % max_fragment_size == 0) {
503  remaining_rows_in_last_fragment = 0;
504  } else {
505  remaining_rows_in_last_fragment =
506  max_fragment_size - (start_row_index % max_fragment_size);
507  }
508  if (buffer_row_count <= remaining_rows_in_last_fragment) {
509  partitions.emplace_back(buffer_row_count);
510  } else {
511  if (remaining_rows_in_last_fragment > 0) {
512  partitions.emplace_back(remaining_rows_in_last_fragment);
513  }
514  size_t remaining_buffer_row_count =
515  buffer_row_count - remaining_rows_in_last_fragment;
516  while (remaining_buffer_row_count > 0) {
517  partitions.emplace_back(
518  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
519  remaining_buffer_row_count -= partitions.back();
520  }
521  }
522  return partitions;
523 }
524 
529 std::optional<ParseBufferRequest> get_next_scan_request(
530  MetadataScanMultiThreadingParams& multi_threading_params) {
531  std::unique_lock<std::mutex> pending_requests_lock(
532  multi_threading_params.pending_requests_mutex);
533  multi_threading_params.pending_requests_condition.wait(
534  pending_requests_lock, [&multi_threading_params] {
535  return !multi_threading_params.pending_requests.empty() ||
536  !multi_threading_params.continue_processing;
537  });
538  if (multi_threading_params.pending_requests.empty()) {
539  return {};
540  }
541  auto request = std::move(multi_threading_params.pending_requests.front());
542  multi_threading_params.pending_requests.pop();
543  pending_requests_lock.unlock();
544  multi_threading_params.pending_requests_condition.notify_all();
545  return std::move(request);
546 }
547 
552 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
553  int fragment_id,
554  size_t first_row_index,
555  const ParseBufferResult& result,
556  const std::string& file_path) {
557  fragment_id_to_file_regions_map[fragment_id].emplace_back(
558  // file naming is handled by FileReader
559  FileRegion(file_path,
560  result.row_offsets.front(),
561  first_row_index,
562  result.row_count,
563  result.row_offsets.back() - result.row_offsets.front()));
564 }
565 
570 void update_stats(Encoder* encoder,
571  const SQLTypeInfo& column_type,
572  DataBlockPtr data_block,
573  const size_t row_count) {
574  if (column_type.is_array()) {
575  encoder->updateStats(data_block.arraysPtr, 0, row_count);
576  } else if (!column_type.is_varlen()) {
577  encoder->updateStats(data_block.numbersPtr, row_count);
578  } else {
579  encoder->updateStats(data_block.stringsPtr, 0, row_count);
580  }
581 }
582 namespace {
584  std::shared_ptr<Catalog_Namespace::Catalog>& catalog,
585  const bool disable_cache) {
586  if (!disable_cache && catalog->getDataMgr()
587  .getPersistentStorageMgr()
588  ->getDiskCacheConfig()
589  .isEnabledForFSI()) {
590  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
591  } else {
592  return nullptr;
593  }
594 }
595 } // namespace
596 
597 // If cache is enabled, populate cached_chunks buffers with data blocks
598 void cache_blocks(std::map<ChunkKey, Chunk_NS::Chunk>& cached_chunks,
599  DataBlockPtr data_block,
600  size_t row_count,
601  ChunkKey& chunk_key,
602  const ColumnDescriptor* column,
603  bool is_first_block,
604  bool disable_cache) {
605  auto catalog =
607  CHECK(catalog);
608  auto cache = get_cache_if_enabled(catalog, disable_cache);
609  if (cache) {
610  // This extra filter needs to be here because this wrapper is the only one that
611  // accesses the cache directly and it should not be inserting chunks which are not
612  // mapped to the current leaf (in distributed mode).
613  if (key_does_not_shard_to_leaf(chunk_key)) {
614  return;
615  }
616 
617  ChunkKey index_key = {chunk_key[CHUNK_KEY_DB_IDX],
618  chunk_key[CHUNK_KEY_TABLE_IDX],
619  chunk_key[CHUNK_KEY_COLUMN_IDX],
620  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
621  2};
622  // Create actual data chunks to prepopulate cache
623  if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
624  cached_chunks[chunk_key] = Chunk_NS::Chunk{column, false};
625  cached_chunks[chunk_key].setBuffer(
626  cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
627  if (column->columnType.is_varlen_indeed()) {
628  cached_chunks[chunk_key].setIndexBuffer(
629  cache->getChunkBufferForPrecaching(index_key, is_first_block));
630  }
631  if (is_first_block) {
632  cached_chunks[chunk_key].initEncoder();
633  }
634  }
635  cached_chunks[chunk_key].appendData(data_block, row_count, 0);
636  }
637 }
638 
640  const foreign_storage::IterativeFileScanParameters& file_scan_param,
641  DataBlockPtr data_block,
642  size_t row_count,
643  const int column_id,
644  const ColumnDescriptor* column,
645  const size_t element_count_required) {
646  auto chunk = shared::get_from_map(file_scan_param.column_id_to_chunk_map, column_id);
647 
648  auto& conditional_variable = file_scan_param.getChunkConditionalVariable(column_id);
649  {
650  std::unique_lock<std::mutex> chunk_lock(file_scan_param.getChunkMutex(column_id));
651  conditional_variable.wait(chunk_lock, [element_count_required, &chunk]() {
652  return chunk.getBuffer()->getEncoder()->getNumElems() == element_count_required;
653  });
654 
655  chunk.appendData(data_block, row_count, 0);
656  }
657 
658  conditional_variable
659  .notify_all(); // notify any threads waiting on the correct element count
660 }
661 
669 std::pair<std::map<int, DataBlockPtr>, std::map<int, DataBlockPtr>> partition_data_blocks(
670  const std::map<int, const ColumnDescriptor*>& column_by_id,
671  const std::map<int, DataBlockPtr>& data_blocks) {
672  std::map<int, DataBlockPtr> dict_encoded_data_blocks;
673  std::map<int, DataBlockPtr> none_dict_encoded_data_blocks;
674  for (auto& [column_id, data_block] : data_blocks) {
675  const auto column = shared::get_from_map(column_by_id, column_id);
676  if (column->columnType.is_dict_encoded_string()) {
677  dict_encoded_data_blocks[column_id] = data_block;
678  } else {
679  none_dict_encoded_data_blocks[column_id] = data_block;
680  }
681  }
682  return {dict_encoded_data_blocks, none_dict_encoded_data_blocks};
683 }
684 
686  const ParseBufferRequest& request,
687  const ParseBufferResult& result,
688  const foreign_storage::IterativeFileScanParameters& file_scan_param,
689  const size_t start_position_in_fragment) {
690  if (file_scan_param.delete_buffer) {
691  std::unique_lock delete_buffer_lock(file_scan_param.delete_buffer_mutex);
692  auto& delete_buffer = file_scan_param.delete_buffer;
693  auto chunk_offset = start_position_in_fragment;
694  auto chunk_element_count = chunk_offset + request.processed_row_count;
695 
696  // ensure delete buffer is sized appropriately
697  resize_delete_buffer(delete_buffer, chunk_element_count);
698 
699  auto delete_buffer_data = delete_buffer->getMemoryPtr();
700  for (const auto rejected_row_index : result.rejected_rows) {
701  CHECK(rejected_row_index + chunk_offset < delete_buffer->size());
702  delete_buffer_data[rejected_row_index + chunk_offset] = true;
703  }
704  }
705 }
706 
708  MetadataScanMultiThreadingParams& multi_threading_params,
709  int fragment_id,
710  const ParseBufferRequest& request,
712  std::map<int, const ColumnDescriptor*>& column_by_id,
713  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
714  const foreign_storage::IterativeFileScanParameters& file_scan_param,
715  const size_t expected_current_element_count) {
716  std::unique_lock<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
717  // File regions should be added in same order as appendData
718  add_file_region(fragment_id_to_file_regions_map,
719  fragment_id,
720  request.first_row_index,
721  result,
722  request.getFilePath());
723  CHECK_EQ(fragment_id, file_scan_param.fragment_id);
724 
725  // start string encoding asynchronously
726  std::vector<std::pair<const size_t, std::future<int8_t*>>>
727  encoded_data_block_ptrs_futures;
728 
729  for (const auto& import_buffer : request.import_buffers) {
730  if (import_buffer == nullptr) {
731  continue;
732  }
733 
734  if (import_buffer->getTypeInfo().is_dict_encoded_string()) {
735  auto string_payload_ptr = import_buffer->getStringBuffer();
736  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
737 
738  auto column_id = import_buffer->getColumnDesc()->columnId;
739  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
740  column_id, std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
741  import_buffer->addDictEncodedString(*string_payload_ptr);
742  return import_buffer->getStringDictBuffer();
743  })));
744  }
745  }
746 
747  auto process_subset_of_data_blocks =
748  [&](const std::map<int, DataBlockPtr>& data_blocks) {
749  for (auto& [column_id, data_block] : data_blocks) {
750  const auto column = column_by_id[column_id];
751  lock.unlock(); // unlock the fragment based lock in order to achieve better
752  // performance
753  append_data_block_to_chunk(file_scan_param,
754  data_block,
755  result.row_count,
756  column_id,
757  column,
758  expected_current_element_count);
759  lock.lock();
760  }
761  };
762 
763  auto [dict_encoded_data_blocks, none_dict_encoded_data_blocks] =
765 
766  process_subset_of_data_blocks(
767  none_dict_encoded_data_blocks); // skip dict string columns
768 
769  // wait for the async requests we made for string dictionary
770  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
771  encoded_ptr_future.second.wait();
772  }
773  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
774  CHECK_GT(dict_encoded_data_blocks.count(encoded_ptr_future.first), 0UL);
775  dict_encoded_data_blocks[encoded_ptr_future.first].numbersPtr =
776  encoded_ptr_future.second.get();
777  }
778 
779  process_subset_of_data_blocks(
780  dict_encoded_data_blocks); // process only dict string columns
781 }
782 
789  int fragment_id,
790  const ParseBufferRequest& request,
792  std::map<int, const ColumnDescriptor*>& column_by_id,
793  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
794  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
795  // File regions should be added in same order as appendData
796  add_file_region(fragment_id_to_file_regions_map,
797  fragment_id,
798  request.first_row_index,
799  result,
800  request.getFilePath());
801 
802  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
803  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
804  const auto column = column_by_id[column_id];
805  if (column->columnType.is_varlen_indeed()) {
806  chunk_key.emplace_back(1);
807  }
808  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
809  multi_threading_params.chunk_encoder_buffers.end()) {
810  multi_threading_params.chunk_encoder_buffers[chunk_key] =
811  std::make_unique<ForeignStorageBuffer>();
812  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
813  column->columnType);
814  }
815  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder(),
816  column->columnType,
817  data_block,
818  result.row_count);
819  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
820  ->getEncoder()
821  ->getNumElems() +
822  result.row_count;
823  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
824  num_elements);
825  cache_blocks(
826  multi_threading_params.cached_chunks,
827  data_block,
828  result.row_count,
829  chunk_key,
830  column,
831  (num_elements - result.row_count) == 0, // Is the first block added to this chunk
832  multi_threading_params.disable_cache);
833  }
834 }
835 
841  ParseBufferRequest& request) {
842  std::unique_lock<std::mutex> completed_requests_queue_lock(
843  multi_threading_params.request_pool_mutex);
844  multi_threading_params.request_pool.emplace(std::move(request));
845  completed_requests_queue_lock.unlock();
846  multi_threading_params.request_pool_condition.notify_all();
847 }
848 
853 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
854  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
855  const TextFileBufferParser& parser) {
856  std::map<int, const ColumnDescriptor*> column_by_id{};
857  while (true) {
858  auto request_opt = get_next_scan_request(multi_threading_params);
859  if (!request_opt.has_value()) {
860  break;
861  }
862  auto& request = request_opt.value();
863  try {
864  if (column_by_id.empty()) {
865  for (const auto column : request.getColumns()) {
866  column_by_id[column->columnId] = column;
867  }
868  }
869  auto partitions = partition_by_fragment(
870  request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
871  request.begin_pos = 0;
872  size_t row_index = request.first_row_index;
873  for (const auto partition : partitions) {
874  request.process_row_count = partition;
875  for (const auto& import_buffer : request.import_buffers) {
876  if (import_buffer != nullptr) {
877  import_buffer->clear();
878  }
879  }
880  auto result = parser.parseBuffer(request, true);
881  int fragment_id = row_index / request.getMaxFragRows();
882  process_data_blocks(multi_threading_params,
883  fragment_id,
884  request,
885  result,
886  column_by_id,
887  fragment_id_to_file_regions_map);
888  row_index += result.row_count;
889  request.begin_pos = result.row_offsets.back() - request.file_offset;
890  }
891  } catch (...) {
892  // Re-add request to pool so we dont block any other threads
893  {
894  std::lock_guard<std::mutex> pending_requests_lock(
895  multi_threading_params.pending_requests_mutex);
896  multi_threading_params.continue_processing = false;
897  }
898  add_request_to_pool(multi_threading_params, request);
899  throw;
900  }
901  add_request_to_pool(multi_threading_params, request);
902  }
903 }
904 
909  MetadataScanMultiThreadingParams& multi_threading_params) {
910  std::unique_lock<std::mutex> request_pool_lock(
911  multi_threading_params.request_pool_mutex);
912  multi_threading_params.request_pool_condition.wait(
913  request_pool_lock,
914  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
915  auto request = std::move(multi_threading_params.request_pool.front());
916  multi_threading_params.request_pool.pop();
917  request_pool_lock.unlock();
918  CHECK(request.buffer);
919  return request;
920 }
921 
923  std::unique_lock<std::mutex> request_pool_lock(
924  multi_threading_params.request_pool_mutex);
925  return !multi_threading_params.request_pool.empty();
926 }
927 
928 /*
929  * Defer processing a request until next iteration. The use case for this is
930  * during an iterative file scan, some requests must defer processing until
931  * the correct fragment is being processed.
932  */
934  ParseBufferRequest& request) {
935  std::unique_lock<std::mutex> deferred_requests_lock(
936  multi_threading_params.deferred_requests_mutex);
937  multi_threading_params.deferred_requests.emplace(std::move(request));
938 }
939 
940 /*
941  * Dispatch all requests that are currently deferred onto the pending request queue.
942  */
944  MetadataScanMultiThreadingParams& multi_threading_params) {
945  std::unique_lock<std::mutex> deferred_requests_lock(
946  multi_threading_params.deferred_requests_mutex);
947  {
948  std::unique_lock<std::mutex> pending_requests_lock(
949  multi_threading_params.pending_requests_mutex);
950 
951  while (!multi_threading_params.deferred_requests.empty()) {
952  auto& request = multi_threading_params.deferred_requests.front();
953  multi_threading_params.pending_requests.emplace(std::move(request));
954  multi_threading_params.deferred_requests.pop();
955  }
956  multi_threading_params.pending_requests_condition.notify_all();
957  }
958 }
959 
965  ParseBufferRequest& request) {
966  {
967  std::unique_lock<std::mutex> pending_requests_lock(
968  multi_threading_params.pending_requests_mutex);
969  multi_threading_params.pending_requests.emplace(std::move(request));
970  }
971  multi_threading_params.pending_requests_condition.notify_all();
972 }
973 
979  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
982  std::map<int, const ColumnDescriptor*> column_by_id{};
983  while (true) {
984  auto request_opt = get_next_scan_request(multi_threading_params);
985  if (!request_opt.has_value()) {
986  break;
987  }
988  ParseBufferRequest& request = request_opt.value();
989  try {
990  if (column_by_id.empty()) {
991  for (const auto column : request.getColumns()) {
992  column_by_id[column->columnId] = column;
993  }
994  }
996  for (size_t num_rows_left_to_process =
997  request.buffer_row_count - request.processed_row_count;
998  num_rows_left_to_process > 0;
999  num_rows_left_to_process =
1000  request.buffer_row_count - request.processed_row_count) {
1001  // NOTE: `request.begin_pos` state is required to be set correctly by this point
1002  // in execution
1003  size_t row_index = request.first_row_index + request.processed_row_count;
1004  int fragment_id = row_index / request.getMaxFragRows();
1005  if (fragment_id >
1006  file_scan_param.fragment_id) { // processing must continue next iteration
1007  defer_scan_request(multi_threading_params, request);
1008  return;
1009  }
1011  row_index, request.getMaxFragRows(), num_rows_left_to_process);
1012  for (const auto& import_buffer : request.import_buffers) {
1013  if (import_buffer != nullptr) {
1014  import_buffer->clear();
1015  }
1016  }
1017  auto result = parser.parseBuffer(request, true, true, true);
1018  size_t start_position_in_fragment = row_index % request.getMaxFragRows();
1019  populate_chunks_using_data_blocks(multi_threading_params,
1020  fragment_id,
1021  request,
1022  result,
1023  column_by_id,
1024  fragment_id_to_file_regions_map,
1025  file_scan_param,
1026  start_position_in_fragment);
1027 
1028  request.processed_row_count += result.row_count;
1029  request.begin_pos = result.row_offsets.back() - request.file_offset;
1030 
1032  request, result, file_scan_param, start_position_in_fragment);
1033  }
1034 
1035  } catch (...) {
1036  // Re-add request to pool so we dont block any other threads
1037  {
1038  std::lock_guard<std::mutex> pending_requests_lock(
1039  multi_threading_params.pending_requests_mutex);
1040  multi_threading_params.continue_processing = false;
1041  }
1042  add_request_to_pool(multi_threading_params, request);
1043  throw;
1044  }
1045  add_request_to_pool(multi_threading_params, request);
1046  }
1047 }
1048 
1053 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
1054  size_t& buffer_size,
1055  const size_t alloc_size) {
1056  CHECK_LE(buffer_size, alloc_size);
1057  if (buffer_size < alloc_size) {
1058  buffer = std::make_unique<char[]>(alloc_size);
1059  buffer_size = alloc_size;
1060  }
1061 }
1062 
1064  foreign_storage::MetadataScanMultiThreadingParams& multi_threading_params) {
1065  multi_threading_params.request_pool = {};
1066  multi_threading_params.cached_chunks = {};
1067  multi_threading_params.pending_requests = {};
1068  multi_threading_params.deferred_requests = {};
1069  multi_threading_params.chunk_encoder_buffers.clear();
1070 }
1071 
1077  const foreign_storage::ForeignTable* table,
1078  const size_t& buffer_size,
1079  const std::string& file_path,
1080  FileReader& file_reader,
1081  const import_export::CopyParams& copy_params,
1082  MetadataScanMultiThreadingParams& multi_threading_params,
1083  size_t& first_row_index_in_buffer,
1084  size_t& current_file_offset,
1086  const foreign_storage::IterativeFileScanParameters* file_scan_param,
1088  iterative_residual_buffer,
1089  const bool is_first_file_scan_call,
1090  int& iterative_scan_last_fragment_id) {
1091  auto& alloc_size = iterative_residual_buffer.alloc_size;
1092  auto& residual_buffer = iterative_residual_buffer.residual_data;
1093  auto& residual_buffer_size = iterative_residual_buffer.residual_buffer_size;
1094  auto& residual_buffer_alloc_size = iterative_residual_buffer.residual_buffer_alloc_size;
1095 
1096  if (is_first_file_scan_call) {
1097  alloc_size = buffer_size;
1098  residual_buffer = std::make_unique<char[]>(alloc_size);
1099  residual_buffer_size = 0;
1100  residual_buffer_alloc_size = alloc_size;
1101  } else if (!no_deferred_requests(multi_threading_params)) {
1102  dispatch_all_deferred_requests(multi_threading_params);
1103  }
1104 
1105  // NOTE: During an interactive scan, it is possible for an entire fragment to
1106  // be parsed into requests, which sit in deferred requests; in order to avoid
1107  // stalling indefinitely while waiting on an available requests, the check
1108  // below determines if this is the case or not.
1109  bool current_fragment_fully_read_during_iterative_scan =
1110  file_scan_param && file_scan_param->fragment_id < iterative_scan_last_fragment_id;
1111 
1112  // NOTE: The conditional below behaves as follows:
1113  //
1114  // * for non-iterative scans,
1115  // current_fragment_fully_read_during_iterative_scan is false, and therefore
1116  // only the first clause of the conditional is relevant
1117  //
1118  // * for interactive scans, if
1119  // current_fragment_fully_read_during_iterative_scan is true, then the
1120  // conditional is skipped, unless it is determined there are still available
1121  // requests to work with, in which case the loop is entered; this is an
1122  // optimization that ensures maximal concurrency of loading while processing
1123  // requests
1124  while (!file_reader.isScanFinished() &&
1125  (request_pool_non_empty(multi_threading_params) ||
1126  !current_fragment_fully_read_during_iterative_scan)) {
1127  {
1128  std::lock_guard<std::mutex> pending_requests_lock(
1129  multi_threading_params.pending_requests_mutex);
1130  if (!multi_threading_params.continue_processing) {
1131  break;
1132  }
1133  }
1134  auto request = get_request_from_pool(multi_threading_params);
1135  request.full_path = file_reader.getCurrentFilePath();
1136  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
1137 
1138  if (residual_buffer_size > 0) {
1139  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
1140  }
1141  size_t size = residual_buffer_size;
1142  size += file_reader.read(request.buffer.get() + residual_buffer_size,
1143  alloc_size - residual_buffer_size);
1144 
1145  if (size == 0) {
1146  // In some cases at the end of a file we will read 0 bytes even when
1147  // file_reader.isScanFinished() is false. Also add request back to the pool to be
1148  // picked up again in the next iteration.
1149  add_request_to_pool(multi_threading_params, request);
1150  continue;
1151  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
1152  // In some cases files with newlines at the end will be encoded with a second
1153  // newline that can end up being the only thing in the buffer. Also add request
1154  // back to the pool to be picked up again in the next iteration.
1155  current_file_offset++;
1156  add_request_to_pool(multi_threading_params, request);
1157  continue;
1158  }
1159  unsigned int num_rows_in_buffer = 0;
1160  request.end_pos = parser.findRowEndPosition(alloc_size,
1161  request.buffer,
1162  size,
1163  copy_params,
1164  first_row_index_in_buffer,
1165  num_rows_in_buffer,
1166  &file_reader);
1167  request.buffer_size = size;
1168  request.buffer_alloc_size = alloc_size;
1169  request.first_row_index = first_row_index_in_buffer;
1170  request.file_offset = current_file_offset;
1171  request.buffer_row_count = num_rows_in_buffer;
1172  request.processed_row_count = 0;
1173  request.begin_pos = 0;
1174 
1175  residual_buffer_size = size - request.end_pos;
1176  if (residual_buffer_size > 0) {
1177  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
1178  memcpy(residual_buffer.get(),
1179  request.buffer.get() + request.end_pos,
1180  residual_buffer_size);
1181  }
1182 
1183  current_file_offset += request.end_pos;
1184  first_row_index_in_buffer += num_rows_in_buffer;
1185 
1186  if (num_rows_in_buffer > 0) {
1187  dispatch_scan_request(multi_threading_params, request);
1188  } else {
1189  add_request_to_pool(multi_threading_params, request);
1190  }
1191 
1192  if (file_scan_param) {
1193  const int32_t last_fragment_index =
1194  (first_row_index_in_buffer) / table->maxFragRows;
1195  if (last_fragment_index > file_scan_param->fragment_id) {
1196  iterative_scan_last_fragment_id = last_fragment_index;
1197  break;
1198  }
1199  }
1200  }
1201 
1202  std::unique_lock<std::mutex> pending_requests_queue_lock(
1203  multi_threading_params.pending_requests_mutex);
1204  multi_threading_params.pending_requests_condition.wait(
1205  pending_requests_queue_lock, [&multi_threading_params] {
1206  return multi_threading_params.pending_requests.empty() ||
1207  (multi_threading_params.continue_processing == false);
1208  });
1209  multi_threading_params.continue_processing = false;
1210  pending_requests_queue_lock.unlock();
1211  multi_threading_params.pending_requests_condition.notify_all();
1212 }
1213 
1215  const foreign_storage::ForeignTable* table,
1216  const size_t& buffer_size,
1217  const std::string& file_path,
1218  FileReader& file_reader,
1219  const import_export::CopyParams& copy_params,
1220  MetadataScanMultiThreadingParams& multi_threading_params,
1221  size_t& first_row_index_in_buffer,
1222  size_t& current_file_offset,
1224  const foreign_storage::IterativeFileScanParameters* file_scan_param,
1226  iterative_residual_buffer,
1227  const bool is_first_file_scan_call,
1228  int& iterative_scan_last_fragment_id) {
1229  try {
1230  dispatch_scan_requests(table,
1231  buffer_size,
1232  file_path,
1233  file_reader,
1234  copy_params,
1235  multi_threading_params,
1236  first_row_index_in_buffer,
1237  current_file_offset,
1238  parser,
1239  file_scan_param,
1240  iterative_residual_buffer,
1241  is_first_file_scan_call,
1242  iterative_scan_last_fragment_id);
1243  } catch (...) {
1244  {
1245  std::unique_lock<std::mutex> pending_requests_lock(
1246  multi_threading_params.pending_requests_mutex);
1247  multi_threading_params.continue_processing = false;
1248  }
1249  multi_threading_params.pending_requests_condition.notify_all();
1250  throw;
1251  }
1252 }
1253 
1255  const foreign_storage::ForeignTable* table,
1256  const size_t& buffer_size,
1257  const std::string& file_path,
1258  FileReader& file_reader,
1259  const import_export::CopyParams& copy_params,
1260  MetadataScanMultiThreadingParams& multi_threading_params,
1261  size_t& first_row_index_in_buffer,
1262  size_t& current_file_offset,
1264  const foreign_storage::IterativeFileScanParameters* file_scan_param,
1266  iterative_residual_buffer,
1267  const bool is_first_file_scan_call) {
1268  int dummy;
1270  buffer_size,
1271  file_path,
1272  file_reader,
1273  copy_params,
1274  multi_threading_params,
1275  first_row_index_in_buffer,
1276  current_file_offset,
1277  parser,
1278  file_scan_param,
1279  iterative_residual_buffer,
1280  is_first_file_scan_call,
1281  dummy);
1282 }
1283 
1284 namespace {
1285 // Create metadata for unscanned columns
1286 // Any fragments with any updated rows between start_row and num_rows will be updated
1287 // Chunks prior to start_row will be restored from (ie for append
1288 // workflows)
1290  const ColumnDescriptor* column,
1291  const ForeignTable* foreign_table,
1292  const int db_id,
1293  const size_t start_row,
1294  const size_t total_num_rows,
1295  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
1296  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
1297  if (column->columnType.is_varlen_indeed()) {
1298  chunk_key.emplace_back(1);
1299  }
1300 
1301  // Create placeholder metadata for every fragment touched by this scan
1302  int start_fragment = start_row / foreign_table->maxFragRows;
1303  int end_fragment{0};
1304  if (total_num_rows > 0) {
1305  end_fragment = (total_num_rows - 1) / foreign_table->maxFragRows;
1306  }
1307  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
1308  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
1309  (fragment_id + 1)) > total_num_rows)
1310  ? total_num_rows % foreign_table->maxFragRows
1311  : foreign_table->maxFragRows;
1312 
1313  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
1314  chunk_metadata_map[chunk_key] =
1315  get_placeholder_metadata(column->columnType, num_elements);
1316  }
1317 }
1318 
1320  const std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map,
1321  const std::map<int, FileRegions>& fragment_id_to_file_regions_map,
1322  const foreign_storage::OptionsMap& server_options,
1323  std::unique_ptr<FileReader>& file_reader,
1324  const std::string& file_path,
1325  const import_export::CopyParams& copy_params,
1326  const shared::FilePathOptions& file_path_options,
1327  const std::optional<size_t>& max_file_count,
1328  const foreign_storage::ForeignTable* foreign_table,
1329  const foreign_storage::UserMapping* user_mapping,
1331  std::function<std::string()> get_s3_key,
1332  size_t& num_rows,
1333  size_t& append_start_offset) {
1334  // Should only be called once for non-append tables
1335  CHECK(chunk_metadata_map.empty());
1336  CHECK(fragment_id_to_file_regions_map.empty());
1338  ->second ==
1340  file_reader = std::make_unique<LocalMultiFileReader>(
1341  file_path, copy_params, file_path_options, max_file_count);
1342  } else {
1343  UNREACHABLE();
1344  }
1345  parser.validateFiles(file_reader.get(), foreign_table);
1346  num_rows = 0;
1347  append_start_offset = 0;
1348 }
1349 } // namespace
1350 
1364  ChunkMetadataVector& chunk_metadata_vector) {
1365  auto timer = DEBUG_TIMER(__func__);
1366 
1367  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1368  const auto file_path = getFullFilePath(foreign_table_);
1370  CHECK(catalog);
1371  auto& parser = getFileBufferParser();
1372  const auto file_path_options = getFilePathOptions(foreign_table_);
1373  auto& server_options = foreign_table_->foreign_server->options;
1374  std::set<std::string> rolled_off_files;
1375  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
1376  auto multi_file_reader = dynamic_cast<MultiFileReader*>(file_reader_.get());
1377  if (allowFileRollOff(foreign_table_) && multi_file_reader) {
1378  rolled_off_files = multi_file_reader->checkForRolledOffFiles(file_path_options);
1379  }
1380  parser.validateFiles(file_reader_.get(), foreign_table_);
1381  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1382  file_reader_->checkForMoreRows(append_start_offset_, file_path_options);
1383  } else {
1384  UNREACHABLE();
1385  }
1386  } else {
1390  server_options,
1391  file_reader_,
1392  file_path,
1393  copy_params,
1394  file_path_options,
1395  getMaxFileCount(),
1397  user_mapping_,
1398  parser,
1399  [] { return ""; },
1400  num_rows_,
1402  }
1403 
1404  auto columns =
1405  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1406  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1407  for (auto column : columns) {
1408  column_by_id[column->columnId] = column;
1409  }
1410  MetadataScanMultiThreadingParams multi_threading_params;
1411  multi_threading_params.disable_cache = disable_cache_;
1412 
1413  // Restore previous chunk data
1414  if (foreign_table_->isAppendMode()) {
1415  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
1416  }
1417 
1418  std::set<int> columns_to_scan;
1419  for (auto column : columns) {
1420  if (!skip_metadata_scan(column)) {
1421  columns_to_scan.insert(column->columnId);
1422  }
1423  }
1424 
1425  // Track where scan started for appends
1426  int start_row = num_rows_;
1427  if (!file_reader_->isScanFinished()) {
1428  auto buffer_size = get_buffer_size(copy_params,
1429  file_reader_->isRemainingSizeKnown(),
1430  file_reader_->getRemainingSize());
1431  auto thread_count = get_thread_count(copy_params,
1432  file_reader_->isRemainingSizeKnown(),
1433  file_reader_->getRemainingSize(),
1434  buffer_size);
1435  multi_threading_params.continue_processing = true;
1436 
1437  std::vector<std::future<void>> futures{};
1438  for (size_t i = 0; i < thread_count; i++) {
1439  multi_threading_params.request_pool.emplace(buffer_size,
1440  copy_params,
1441  db_id_,
1443  columns_to_scan,
1445  nullptr,
1446  disable_cache_);
1447 
1448  futures.emplace_back(std::async(std::launch::async,
1449  scan_metadata,
1450  std::ref(multi_threading_params),
1452  std::ref(parser)));
1453  }
1454 
1455  ResidualBuffer residual_buffer;
1457  buffer_size,
1458  file_path,
1459  (*file_reader_),
1460  copy_params,
1461  multi_threading_params,
1462  num_rows_,
1465  nullptr,
1466  residual_buffer,
1467  true);
1468 
1469  for (auto& future : futures) {
1470  // get() instead of wait() because we need to propagate potential exceptions.
1471  future.get();
1472  }
1473  }
1474 
1475  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
1476  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
1477  CHECK(column_entry != column_by_id.end());
1478  const auto& column_type = column_entry->second->columnType;
1479  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
1480  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
1481  const auto& cached_chunks = multi_threading_params.cached_chunks;
1482  if (!column_type.is_varlen_indeed()) {
1483  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
1484  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
1485  chunk_entry != cached_chunks.end()) {
1486  auto buffer = chunk_entry->second.getBuffer();
1487  CHECK(buffer);
1488  chunk_metadata->numBytes = buffer->size();
1489  } else {
1490  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
1491  }
1492  chunk_metadata_map_[chunk_key] = chunk_metadata;
1493  }
1494 
1495  for (auto column : columns) {
1496  if (skip_metadata_scan(column)) {
1498  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
1499  }
1500  }
1501 
1502  if (!rolled_off_files.empty()) {
1503  updateRolledOffChunks(rolled_off_files, column_by_id);
1504  }
1505 
1506  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1507  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1508  }
1509 
1510  // Save chunk data
1511  if (foreign_table_->isAppendMode()) {
1512  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
1513  }
1514 } // namespace foreign_storage
1515 
1517  ChunkMetadataVector& chunk_metadata_vector,
1518  IterativeFileScanParameters& file_scan_param) {
1519  auto timer = DEBUG_TIMER(__func__);
1520 
1522 
1524  << " iterative file scan can not be used with APPEND mode.";
1525 
1526  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1527  const auto file_path = getFullFilePath(foreign_table_);
1529  CHECK(catalog);
1530  auto& parser = getFileBufferParser();
1531  const auto file_path_options = getFilePathOptions(foreign_table_);
1532  auto& server_options = foreign_table_->foreign_server->options;
1533 
1539  server_options,
1540  file_reader_,
1541  file_path,
1542  copy_params,
1543  file_path_options,
1544  getMaxFileCount(),
1546  user_mapping_,
1547  parser,
1548  [] { return ""; },
1549  num_rows_,
1551  }
1552 
1553  auto columns =
1554  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1555  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1556  for (auto column : columns) {
1557  column_by_id[column->columnId] = column;
1558  }
1559 
1560  if (is_first_file_scan_call_) { // reiniitialize all members that may have state in
1561  // `multi_threading_params_`
1563  }
1564 
1566 
1567  std::set<int> columns_to_scan;
1568  for (auto column : columns) {
1569  columns_to_scan.insert(column->columnId);
1570  }
1571 
1574  // NOTE: `buffer_size_` and `thread_count_` must not change across an iterative
1575  // scan
1576  buffer_size_ = get_buffer_size(copy_params,
1577  file_reader_->isRemainingSizeKnown(),
1578  file_reader_->getRemainingSize());
1579  thread_count_ = get_thread_count(copy_params,
1580  file_reader_->isRemainingSizeKnown(),
1581  file_reader_->getRemainingSize(),
1582  buffer_size_);
1583  }
1585 
1586  std::vector<std::future<void>> futures{};
1587  for (size_t i = 0; i < thread_count_; i++) {
1590  copy_params,
1591  db_id_,
1593  columns_to_scan,
1596  true);
1597  }
1598  futures.emplace_back(std::async(std::launch::async,
1600  std::ref(multi_threading_params_),
1602  std::ref(parser),
1603  std::ref(file_scan_param)));
1604  }
1605 
1607  buffer_size_,
1608  file_path,
1609  (*file_reader_),
1610  copy_params,
1612  num_rows_,
1615  &file_scan_param,
1619 
1620  for (auto& future : futures) {
1621  // get() instead of wait() because we need to propagate potential exceptions.
1622  future.get();
1623  }
1624  }
1625 
1627  is_first_file_scan_call_ = false;
1628  }
1629 
1632  }
1633 }
1634 
1636  const std::set<std::string>& rolled_off_files,
1637  const std::map<int32_t, const ColumnDescriptor*>& column_by_id) {
1638  std::set<int32_t> deleted_fragment_ids;
1639  std::optional<int32_t> partially_deleted_fragment_id;
1640  std::optional<size_t> partially_deleted_fragment_row_count;
1641  for (auto& [fragment_id, file_regions] : fragment_id_to_file_regions_map_) {
1642  bool file_region_deleted{false};
1643  for (auto it = file_regions.begin(); it != file_regions.end();) {
1644  if (shared::contains(rolled_off_files, it->file_path)) {
1645  it = file_regions.erase(it);
1646  file_region_deleted = true;
1647  } else {
1648  it++;
1649  }
1650  }
1651  if (file_regions.empty()) {
1652  deleted_fragment_ids.emplace(fragment_id);
1653  } else if (file_region_deleted) {
1654  partially_deleted_fragment_id = fragment_id;
1655  partially_deleted_fragment_row_count = 0;
1656  for (const auto& file_region : file_regions) {
1657  partially_deleted_fragment_row_count.value() += file_region.row_count;
1658  }
1659  break;
1660  }
1661  }
1662 
1663  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1664  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
1665  chunk_metadata->numElements = 0;
1666  chunk_metadata->numBytes = 0;
1667  } else if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
1668  CHECK(partially_deleted_fragment_row_count.has_value());
1669  auto old_chunk_stats = chunk_metadata->chunkStats;
1670  auto cd = shared::get_from_map(column_by_id, chunk_key[CHUNK_KEY_COLUMN_IDX]);
1671  chunk_metadata = get_placeholder_metadata(
1672  cd->columnType, partially_deleted_fragment_row_count.value());
1673  // Old chunk stats will still be correct (since only row deletion is occurring)
1674  // and more accurate than that of the placeholder metadata.
1675  chunk_metadata->chunkStats = old_chunk_stats;
1676  }
1677  }
1678 }
1679 
1681  rapidjson::Document d;
1682  d.SetObject();
1683 
1684  // Save fragment map
1687  "fragment_id_to_file_regions_map",
1688  d.GetAllocator());
1689 
1690  // Save reader metadata
1691  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1692  file_reader_->serialize(reader_metadata, d.GetAllocator());
1693  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1694 
1695  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1697  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1698 
1699  return json_utils::write_to_string(d);
1700 }
1701 
1703  const std::string& file_path,
1704  const ChunkMetadataVector& chunk_metadata) {
1705  auto d = json_utils::read_from_file(file_path);
1706  CHECK(d.IsObject());
1707 
1708  // Restore fragment map
1710  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1711 
1712  // Construct reader with metadta
1713  CHECK(d.HasMember("reader_metadata"));
1714  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1715  const auto full_file_path = getFullFilePath(foreign_table_);
1716  auto& server_options = foreign_table_->foreign_server->options;
1717  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1718  file_reader_ = std::make_unique<LocalMultiFileReader>(
1719  full_file_path, copy_params, d["reader_metadata"]);
1720  } else {
1721  UNREACHABLE();
1722  }
1723 
1725  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1726 
1727  // Now restore the internal metadata maps
1728  CHECK(chunk_metadata_map_.empty());
1729  CHECK(chunk_encoder_buffers_.empty());
1730 
1731  for (auto& pair : chunk_metadata) {
1732  chunk_metadata_map_[pair.first] = pair.second;
1733 
1734  if (foreign_table_->isAppendMode()) {
1735  // Restore encoder state for append mode
1736  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1737  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1738  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1739  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1740  pair.second->numElements);
1741  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1742  pair.second->chunkStats);
1743  chunk_encoder_buffers_[pair.first]->setUpdated();
1744  }
1745  }
1746  is_restored_ = true;
1747 }
1748 
1750  return is_restored_;
1751 }
1752 
1753 // declared in three derived classes to avoid
1754 // polluting ForeignDataWrapper virtual base
1755 // @TODO refactor to lower class if needed
1757  // must have these
1758  CHECK_GE(db_id_, 0);
1760 
1761  // populate map for all poly columns in this table
1763  CHECK(catalog);
1764  auto columns =
1765  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1766  for (auto const& column : columns) {
1767  if (IS_GEO_POLY(column->columnType.get_type())) {
1769  .try_emplace(column->columnId,
1770  std::make_unique<import_export::RenderGroupAnalyzer>())
1771  .second);
1772  }
1773  }
1774 }
1775 
1776 std::optional<size_t> AbstractTextFileDataWrapper::getMaxFileCount() const {
1777  return {};
1778 }
1779 
1780 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool contains(const T &container, const U &element)
Definition: misc.h:195
void append_data_block_to_chunk(const foreign_storage::IterativeFileScanParameters &file_scan_param, DataBlockPtr data_block, size_t row_count, const int column_id, const ColumnDescriptor *column, const size_t element_count_required)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
virtual bool isScanFinished() const =0
std::vector< int > ChunkKey
Definition: types.h:36
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
virtual void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const =0
virtual size_t read(void *buffer, size_t max_size)=0
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:224
std::string tableName
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:225
void resize_delete_buffer(AbstractBuffer *delete_buffer, const size_t chunk_element_count)
std::pair< std::map< int, DataBlockPtr >, std::map< int, DataBlockPtr > > partition_data_blocks(const std::map< int, const ColumnDescriptor * > &column_by_id, const std::map< int, DataBlockPtr > &data_blocks)
void populate_chunks_using_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::IterativeFileScanParameters &file_scan_param, const size_t expected_current_element_count)
std::condition_variable & getChunkConditionalVariable(const int col_id) const
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool is_varlen() const
Definition: sqltypes.h:620
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void updateRolledOffChunks(const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
virtual ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const =0
virtual int8_t * getMemoryPtr()=0
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
#define UNREACHABLE()
Definition: Logger.h:337
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
#define CHECK_GE(x, y)
Definition: Logger.h:306
void reset_multithreading_params(foreign_storage::MetadataScanMultiThreadingParams &multi_threading_params)
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
size_t num_rows_to_process(const size_t start_row_index, const size_t max_fragment_size, const size_t rows_remaining)
virtual const TextFileBufferParser & getFileBufferParser() const =0
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
std::map< int, Chunk_NS::Chunk > & column_id_to_chunk_map
void dispatch_scan_requests(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
#define CHECK_GT(x, y)
Definition: Logger.h:305
bool no_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
std::string to_string(char const *&&v)
void iterativeFileScan(ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
std::vector< FileRegion > FileRegions
Definition: FileRegions.h:60
MetadataScanMultiThreadingParams multi_threading_params_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:172
bool key_does_not_shard_to_leaf(const ChunkKey &key)
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:343
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void throw_fragment_id_out_of_bounds_error(const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
void update_delete_buffer(const ParseBufferRequest &request, const ParseBufferResult &result, const foreign_storage::IterativeFileScanParameters &file_scan_param, const size_t start_position_in_fragment)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void populate_chunks(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser, foreign_storage::IterativeFileScanParameters &file_scan_param)
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
rapidjson::Document read_from_file(const std::string &file_path)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
virtual std::optional< size_t > getMaxFileCount() const
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
bool is_dict_encoded_type() const
Definition: sqltypes.h:644
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
void dispatch_all_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:157
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
bool isAppendMode() const
Checks if the table is in append mode.
std::mutex & getChunkMutex(const int col_id) const
bool request_pool_non_empty(MetadataScanMultiThreadingParams &multi_threading_params)
void defer_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHECK_LE(x, y)
Definition: Logger.h:304
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
virtual size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
bool g_enable_watchdog false
Definition: Execute.cpp:79
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const ParseBufferResult &result, const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
std::map< std::string, std::string, std::less<>> OptionsMap
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
std::string write_to_string(const rapidjson::Document &document)
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:626
std::optional< ParseBufferRequest > get_next_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
void throw_unexpected_number_of_items(const size_t &num_expected, const size_t &num_loaded, const std::string &item_type)
int8_t * numbersPtr
Definition: sqltypes.h:223
virtual std::set< std::string > checkForRolledOffFiles(const shared::FilePathOptions &file_path_options)
Definition: FileReader.cpp:615
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void dispatch_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool disable_cache)
bool is_array() const
Definition: sqltypes.h:588
virtual std::string getCurrentFilePath() const =0
#define IS_GEO_POLY(T)
Definition: sqltypes.h:305