OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AbstractTextFileDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 #include <queue>
23 
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
26 
32 #include "FsiJsonUtils.h"
34 #include "Shared/misc.h"
35 
36 namespace foreign_storage {
38  : db_id_(-1)
39  , foreign_table_(nullptr)
40  , user_mapping_(nullptr)
41  , disable_cache_(false)
42  , is_first_file_scan_call_(true)
43  , is_file_scan_in_progress_(false) {}
44 
46  const int db_id,
47  const ForeignTable* foreign_table)
48  : db_id_(db_id)
49  , foreign_table_(foreign_table)
50  , is_restored_(false)
51  , user_mapping_(nullptr)
52  , disable_cache_(false)
53  , is_first_file_scan_call_(true)
54  , is_file_scan_in_progress_(false) {}
55 
57  const int db_id,
58  const ForeignTable* foreign_table,
59  const UserMapping* user_mapping,
60  const bool disable_cache)
61  : db_id_(db_id)
62  , foreign_table_(foreign_table)
63  , is_restored_(false)
64  , user_mapping_(user_mapping)
65  , disable_cache_(disable_cache)
66  , is_first_file_scan_call_(true)
67  , is_file_scan_in_progress_(false) {}
68 
69 namespace {
70 
72  const int32_t fragment_id,
73  const int32_t max_fragment_id) {
75  "Attempting to populate fragment id " + std::to_string(fragment_id) +
76  " for foreign table " + table->tableName +
77  " which is greater than the maximum fragment id of " +
78  std::to_string(max_fragment_id) + "."};
79 }
80 
81 std::set<const ColumnDescriptor*> get_columns(const ChunkToBufferMap& buffers,
82  const Catalog_Namespace::Catalog& catalog,
83  const int32_t table_id,
84  const int fragment_id) {
85  CHECK(!buffers.empty());
86  std::set<const ColumnDescriptor*> columns;
87  for (const auto& entry : buffers) {
88  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
89  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
90  const auto column = catalog.getMetadataForColumn(table_id, column_id);
91  columns.emplace(column);
92  }
93  return columns;
94 }
95 
96 bool skip_metadata_scan(const ColumnDescriptor* column) {
97  return column->columnType.is_dict_encoded_type();
98 }
99 } // namespace
100 
102  const std::set<const ColumnDescriptor*>& columns,
103  const int fragment_id,
104  const ChunkToBufferMap& buffers,
105  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
106  for (const auto column : columns) {
107  ChunkKey data_chunk_key = {
108  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
109  init_chunk_for_column(data_chunk_key,
111  buffers,
112  column_id_to_chunk_map[column->columnId]);
113  }
114 }
115 
117  const ChunkToBufferMap& required_buffers,
118  const ChunkToBufferMap& optional_buffers,
119  AbstractBuffer* delete_buffer) {
120  auto timer = DEBUG_TIMER(__func__);
122  CHECK(catalog);
123  CHECK(!required_buffers.empty());
124 
125  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
126  auto required_columns =
127  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
128  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
130  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
131 
132  if (!optional_buffers.empty()) {
133  auto optional_columns =
134  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
136  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
137  }
138  populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
140  updateMetadata(column_id_to_chunk_map, fragment_id);
141  }
142 }
143 
144 // if column was skipped during scan, update metadata now
146  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
147  int fragment_id) {
149  CHECK(catalog);
150  for (auto& entry : column_id_to_chunk_map) {
151  const auto& column =
152  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
153  if (skip_metadata_scan(column)) {
154  ChunkKey data_chunk_key = {
155  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
156  if (column->columnType.is_varlen_indeed()) {
157  data_chunk_key.emplace_back(1);
158  }
159  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
160  // Allocate new shared_ptr for metadata so we dont modify old one which may be
161  // used by executor
162  auto cached_metadata_previous =
163  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
164  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
165  std::make_shared<ChunkMetadata>();
166  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
167  *cached_metadata = *cached_metadata_previous;
168  auto chunk_metadata =
169  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
170  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
171  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
172  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
173  cached_metadata->numBytes = entry.second.getBuffer()->size();
174  }
175  }
176 }
177 
183  size_t file_offset;
184  size_t row_count;
185  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
186  std::set<size_t> rejected_row_indices;
187 
188  bool operator<(const ParseFileRegionResult& other) const {
189  return file_offset < other.file_offset;
190  }
191 };
192 
193 namespace {
194 void throw_unexpected_number_of_items(const size_t num_expected,
195  const size_t num_loaded,
196  const std::string& item_type,
197  const std::string& foreign_table_name) {
198  try {
200  num_expected, num_loaded, item_type);
201  } catch (const foreign_storage::ForeignStorageException& except) {
203  std::string(except.what()) + " Foreign table: " + foreign_table_name);
204  }
205 }
206 
207 } // namespace
208 
214  const FileRegions& file_regions,
215  const size_t start_index,
216  const size_t end_index,
217  FileReader& file_reader,
218  ParseBufferRequest& parse_file_request,
219  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
220  const TextFileBufferParser& parser) {
221  auto timer = DEBUG_TIMER(__func__);
222  ParseFileRegionResult load_file_region_result{};
223  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
224  load_file_region_result.row_count = 0;
225 
227  for (size_t i = start_index; i <= end_index; i++) {
228  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
229  auto read_size = file_reader.readRegion(parse_file_request.buffer.get(),
230  file_regions[i].first_row_file_offset,
231  file_regions[i].region_size);
232  if (file_regions[i].region_size != read_size) {
233  throw_unexpected_number_of_items(file_regions[i].region_size,
234  read_size,
235  "bytes",
236  parse_file_request.getTableName());
237  }
238  parse_file_request.begin_pos = 0;
239  parse_file_request.end_pos = file_regions[i].region_size;
240  parse_file_request.first_row_index = file_regions[i].first_row_index;
241  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
242  parse_file_request.process_row_count = file_regions[i].row_count;
243 
244  result = parser.parseBuffer(parse_file_request, i == end_index);
245  CHECK_EQ(file_regions[i].row_count, result.row_count);
246  for (const auto& rejected_row_index : result.rejected_rows) {
247  load_file_region_result.rejected_row_indices.insert(
248  load_file_region_result.row_count + rejected_row_index);
249  }
250  load_file_region_result.row_count += result.row_count;
251  }
252  load_file_region_result.column_id_to_data_blocks_map =
254  return load_file_region_result;
255 }
256 
257 namespace {
258 
262 size_t get_buffer_size(const import_export::CopyParams& copy_params,
263  const bool size_known,
264  const size_t file_size) {
265  size_t buffer_size = copy_params.buffer_size;
266  if (size_known && file_size < buffer_size) {
267  buffer_size = file_size + 1; // +1 for end of line character, if missing
268  }
269  return buffer_size;
270 }
271 
272 size_t get_buffer_size(const FileRegions& file_regions) {
273  size_t buffer_size = 0;
274  for (const auto& file_region : file_regions) {
275  buffer_size = std::max(buffer_size, file_region.region_size);
276  }
277  CHECK(buffer_size);
278  return buffer_size;
279 }
280 
285 size_t get_thread_count(const import_export::CopyParams& copy_params,
286  const bool size_known,
287  const size_t file_size,
288  const size_t buffer_size) {
289  size_t thread_count = copy_params.threads;
290  if (thread_count == 0) {
291  thread_count = std::thread::hardware_concurrency();
292  }
293  if (size_known && file_size > 0) {
294  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
295  if (num_buffers_in_file < thread_count) {
296  thread_count = num_buffers_in_file;
297  }
298  }
299  CHECK_GT(thread_count, static_cast<size_t>(0));
300  return thread_count;
301 }
302 
303 size_t get_thread_count(const import_export::CopyParams& copy_params,
304  const FileRegions& file_regions) {
305  size_t thread_count = copy_params.threads;
306  if (thread_count == 0) {
307  thread_count =
308  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
309  }
310  CHECK_GT(thread_count, static_cast<size_t>(0));
311  return thread_count;
312 }
313 
315  const size_t chunk_element_count) {
316  if (delete_buffer->size() < chunk_element_count) {
317  auto remaining_rows = chunk_element_count - delete_buffer->size();
318  std::vector<int8_t> data(remaining_rows, false);
319  delete_buffer->append(data.data(), remaining_rows);
320  }
321 }
322 
324  std::unique_lock<std::mutex> deferred_requests_lock(
325  multi_threading_params.deferred_requests_mutex);
326  return multi_threading_params.deferred_requests.empty();
327 }
328 
329 bool is_file_scan_finished(const FileReader* file_reader,
330  MetadataScanMultiThreadingParams& multi_threading_params) {
331  return file_reader->isScanFinished() && no_deferred_requests(multi_threading_params);
332 }
333 
334 } // namespace
335 
337  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
338  int fragment_id,
339  AbstractBuffer* delete_buffer) {
340  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
341 
342  CHECK(!column_id_to_chunk_map.empty());
343 
344  // check to see if a iterative scan step is required
345  auto file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
346  if (file_regions_it == fragment_id_to_file_regions_map_.end() ||
348  // check to see if there is more foreign data to scan
351  // NOTE: we can only guarantee the current `fragment_id` is fully done
352  // iterative scan if either
353  // 1) the scan is finished OR
354  // 2) `fragment_id+1` exists in the internal map
355  // this is why `fragment_id+1` is checked for below
356  auto file_regions_it_one_ahead =
357  fragment_id_to_file_regions_map_.find(fragment_id + 1);
359  (file_regions_it_one_ahead == fragment_id_to_file_regions_map_.end())) {
360  ChunkMetadataVector chunk_metadata_vector;
361  IterativeFileScanParameters iterative_params{
362  column_id_to_chunk_map, fragment_id, delete_buffer};
363  iterativeFileScan(chunk_metadata_vector, iterative_params);
364  }
365  }
366 
367  file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
368  if (file_regions_it == fragment_id_to_file_regions_map_.end()) {
370  is_file_scan_in_progress_ = false; // conclude the iterative scan is finished
372  true; // any subsequent iterative request can assume they will be the first
374  foreign_table_, fragment_id, fragment_id_to_file_regions_map_.rbegin()->first);
375  } else {
376  // iterative scan is required to have loaded all required chunks thus we
377  // can exit early
378  return;
379  }
380  }
381  CHECK(file_regions_it != fragment_id_to_file_regions_map_.end());
382 
383  const auto& file_regions = file_regions_it->second;
384 
385  // File roll off can lead to empty file regions.
386  if (file_regions.empty()) {
387  return;
388  }
389 
390  const auto buffer_size = get_buffer_size(file_regions);
391  const auto thread_count = get_thread_count(copy_params, file_regions);
392 
393  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
394 
395  std::vector<ParseBufferRequest> parse_file_requests{};
396  parse_file_requests.reserve(thread_count);
397  std::vector<std::future<ParseFileRegionResult>> futures{};
398  std::set<int> column_filter_set;
399  for (const auto& pair : column_id_to_chunk_map) {
400  column_filter_set.insert(pair.first);
401  }
402 
403  std::vector<std::unique_ptr<FileReader>> file_readers;
404  rapidjson::Value reader_metadata(rapidjson::kObjectType);
405  rapidjson::Document d;
406  auto& server_options = foreign_table_->foreign_server->options;
407  file_reader_->serialize(reader_metadata, d.GetAllocator());
408  const auto file_path = getFullFilePath(foreign_table_);
409  auto& parser = getFileBufferParser();
410 
411  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
412  parse_file_requests.emplace_back(buffer_size,
413  copy_params,
414  db_id_,
416  column_filter_set,
417  file_path,
419  delete_buffer != nullptr);
420  auto start_index = i;
421  auto end_index =
422  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
423 
424  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
425  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
426  file_path, copy_params, reader_metadata));
427  } else {
428  UNREACHABLE();
429  }
430 
431  futures.emplace_back(std::async(std::launch::async,
433  std::ref(file_regions),
434  start_index,
435  end_index,
436  std::ref(*(file_readers.back())),
437  std::ref(parse_file_requests.back()),
438  std::ref(column_id_to_chunk_map),
439  std::ref(parser)));
440  }
441 
442  for (auto& future : futures) {
443  future.wait();
444  }
445 
446  std::vector<ParseFileRegionResult> load_file_region_results{};
447  for (auto& future : futures) {
448  load_file_region_results.emplace_back(future.get());
449  }
450 
451  std::set<size_t> chunk_rejected_row_indices;
452  size_t chunk_offset = 0;
453  for (auto result : load_file_region_results) {
454  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
455  chunk.appendData(
456  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
457  }
458  for (const auto& rejected_row_index : result.rejected_row_indices) {
459  chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
460  }
461  chunk_offset += result.row_count;
462  }
463 
464  if (delete_buffer) {
465  // ensure delete buffer is sized appropriately
466  resize_delete_buffer(delete_buffer, chunk_offset);
467 
468  auto delete_buffer_data = delete_buffer->getMemoryPtr();
469  for (const auto rejected_row_index : chunk_rejected_row_indices) {
470  delete_buffer_data[rejected_row_index] = true;
471  }
472  }
473 }
474 
480 size_t num_rows_to_process(const size_t start_row_index,
481  const size_t max_fragment_size,
482  const size_t rows_remaining) {
483  size_t start_position_in_fragment = start_row_index % max_fragment_size;
484  return max_fragment_size - start_position_in_fragment;
485 }
486 
493 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
494  const size_t max_fragment_size,
495  const size_t buffer_row_count) {
496  CHECK(buffer_row_count > 0);
497  std::vector<size_t> partitions{};
498  size_t remaining_rows_in_last_fragment;
499  if (start_row_index % max_fragment_size == 0) {
500  remaining_rows_in_last_fragment = 0;
501  } else {
502  remaining_rows_in_last_fragment =
503  max_fragment_size - (start_row_index % max_fragment_size);
504  }
505  if (buffer_row_count <= remaining_rows_in_last_fragment) {
506  partitions.emplace_back(buffer_row_count);
507  } else {
508  if (remaining_rows_in_last_fragment > 0) {
509  partitions.emplace_back(remaining_rows_in_last_fragment);
510  }
511  size_t remaining_buffer_row_count =
512  buffer_row_count - remaining_rows_in_last_fragment;
513  while (remaining_buffer_row_count > 0) {
514  partitions.emplace_back(
515  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
516  remaining_buffer_row_count -= partitions.back();
517  }
518  }
519  return partitions;
520 }
521 
526 std::optional<ParseBufferRequest> get_next_scan_request(
527  MetadataScanMultiThreadingParams& multi_threading_params) {
528  std::unique_lock<std::mutex> pending_requests_lock(
529  multi_threading_params.pending_requests_mutex);
530  multi_threading_params.pending_requests_condition.wait(
531  pending_requests_lock, [&multi_threading_params] {
532  return !multi_threading_params.pending_requests.empty() ||
533  !multi_threading_params.continue_processing;
534  });
535  if (multi_threading_params.pending_requests.empty()) {
536  return {};
537  }
538  auto request = std::move(multi_threading_params.pending_requests.front());
539  multi_threading_params.pending_requests.pop();
540  pending_requests_lock.unlock();
541  multi_threading_params.pending_requests_condition.notify_all();
542  return std::move(request);
543 }
544 
549 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
550  int fragment_id,
551  size_t first_row_index,
552  const ParseBufferResult& result,
553  const std::string& file_path) {
554  fragment_id_to_file_regions_map[fragment_id].emplace_back(
555  // file naming is handled by FileReader
556  FileRegion(file_path,
557  result.row_offsets.front(),
558  first_row_index,
559  result.row_count,
560  result.row_offsets.back() - result.row_offsets.front()));
561 }
562 
567 void update_stats(Encoder* encoder,
568  const SQLTypeInfo& column_type,
569  DataBlockPtr data_block,
570  const size_t row_count) {
571  if (column_type.is_array()) {
572  encoder->updateStats(data_block.arraysPtr, 0, row_count);
573  } else if (!column_type.is_varlen()) {
574  encoder->updateStats(data_block.numbersPtr, row_count);
575  } else {
576  encoder->updateStats(data_block.stringsPtr, 0, row_count);
577  }
578 }
579 namespace {
581  std::shared_ptr<Catalog_Namespace::Catalog>& catalog,
582  const bool disable_cache) {
583  if (!disable_cache && catalog->getDataMgr()
584  .getPersistentStorageMgr()
585  ->getDiskCacheConfig()
586  .isEnabledForFSI()) {
587  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
588  } else {
589  return nullptr;
590  }
591 }
592 } // namespace
593 
594 // If cache is enabled, populate cached_chunks buffers with data blocks
595 void cache_blocks(std::map<ChunkKey, Chunk_NS::Chunk>& cached_chunks,
596  DataBlockPtr data_block,
597  size_t row_count,
598  ChunkKey& chunk_key,
599  const ColumnDescriptor* column,
600  bool is_first_block,
601  bool disable_cache) {
602  auto catalog =
604  CHECK(catalog);
605  auto cache = get_cache_if_enabled(catalog, disable_cache);
606  if (cache) {
607  // This extra filter needs to be here because this wrapper is the only one that
608  // accesses the cache directly and it should not be inserting chunks which are not
609  // mapped to the current leaf (in distributed mode).
610  if (key_does_not_shard_to_leaf(chunk_key)) {
611  return;
612  }
613 
614  ChunkKey index_key = {chunk_key[CHUNK_KEY_DB_IDX],
615  chunk_key[CHUNK_KEY_TABLE_IDX],
616  chunk_key[CHUNK_KEY_COLUMN_IDX],
617  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
618  2};
619  // Create actual data chunks to prepopulate cache
620  if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
621  cached_chunks[chunk_key] = Chunk_NS::Chunk{column, false};
622  cached_chunks[chunk_key].setBuffer(
623  cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
624  if (column->columnType.is_varlen_indeed()) {
625  cached_chunks[chunk_key].setIndexBuffer(
626  cache->getChunkBufferForPrecaching(index_key, is_first_block));
627  }
628  if (is_first_block) {
629  cached_chunks[chunk_key].initEncoder();
630  }
631  }
632  cached_chunks[chunk_key].appendData(data_block, row_count, 0);
633  }
634 }
635 
637  const foreign_storage::IterativeFileScanParameters& file_scan_param,
638  DataBlockPtr data_block,
639  size_t row_count,
640  const ChunkKey& chunk_key,
641  const ColumnDescriptor* column,
642  const std::set<size_t>& rejected_row_indices,
643  const size_t element_count_required) {
644  auto chunk = shared::get_from_map(file_scan_param.column_id_to_chunk_map,
645  chunk_key[CHUNK_KEY_COLUMN_IDX]);
646 
647  auto& conditional_variable =
648  file_scan_param.getChunkConditionalVariable(chunk_key[CHUNK_KEY_COLUMN_IDX]);
649  {
650  std::unique_lock<std::mutex> chunk_lock(
651  file_scan_param.getChunkMutex(chunk_key[CHUNK_KEY_COLUMN_IDX]));
652  conditional_variable.wait(chunk_lock, [element_count_required, &chunk]() {
653  return chunk.getBuffer()->getEncoder()->getNumElems() == element_count_required;
654  });
655 
656  chunk.appendData(data_block, row_count, 0);
657  }
658 
659  conditional_variable
660  .notify_all(); // notify any threads waiting on the correct element count
661 
662  if (file_scan_param.delete_buffer) {
663  std::unique_lock delete_buffer_lock(file_scan_param.delete_buffer_mutex);
664  auto& delete_buffer = file_scan_param.delete_buffer;
665  auto chunk_offset = element_count_required;
666  auto chunk_element_count = chunk_offset + row_count;
667 
668  // ensure delete buffer is sized appropriately
669  resize_delete_buffer(delete_buffer, chunk_element_count);
670 
671  auto delete_buffer_data = delete_buffer->getMemoryPtr();
672  for (const auto rejected_row_index : rejected_row_indices) {
673  CHECK(rejected_row_index + chunk_offset < delete_buffer->size());
674  delete_buffer_data[rejected_row_index + chunk_offset] = true;
675  }
676  }
677 }
678 
680  MetadataScanMultiThreadingParams& multi_threading_params,
681  int fragment_id,
682  const ParseBufferRequest& request,
684  std::map<int, const ColumnDescriptor*>& column_by_id,
685  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
686  const foreign_storage::IterativeFileScanParameters& file_scan_param) {
687  std::unique_lock<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
688  // File regions should be added in same order as appendData
689  add_file_region(fragment_id_to_file_regions_map,
690  fragment_id,
691  request.first_row_index,
692  result,
693  request.getFilePath());
694  CHECK_EQ(fragment_id, file_scan_param.fragment_id);
695 
696  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
697  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
698  const auto column = column_by_id[column_id];
699  if (column->columnType.is_varlen_indeed()) {
700  chunk_key.emplace_back(1);
701  }
702  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
703  multi_threading_params.chunk_encoder_buffers.end()) {
704  multi_threading_params.chunk_encoder_buffers[chunk_key] =
705  std::make_unique<ForeignStorageBuffer>();
706  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
707  column->columnType);
708  }
709  size_t current_element_count =
710  multi_threading_params.chunk_encoder_buffers[chunk_key]
711  ->getEncoder()
712  ->getNumElems();
713  size_t num_elements = current_element_count + result.row_count;
714  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
715  num_elements);
716  lock.unlock(); // unlock the fragment based lock in order to achieve better
717  // performance
718  append_data_block_to_chunk(file_scan_param,
719  data_block,
720  result.row_count,
721  chunk_key,
722  column,
723  result.rejected_rows,
724  current_element_count);
725  lock.lock();
726  }
727 }
728 
735  int fragment_id,
736  const ParseBufferRequest& request,
738  std::map<int, const ColumnDescriptor*>& column_by_id,
739  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
740  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
741  // File regions should be added in same order as appendData
742  add_file_region(fragment_id_to_file_regions_map,
743  fragment_id,
744  request.first_row_index,
745  result,
746  request.getFilePath());
747 
748  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
749  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
750  const auto column = column_by_id[column_id];
751  if (column->columnType.is_varlen_indeed()) {
752  chunk_key.emplace_back(1);
753  }
754  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
755  multi_threading_params.chunk_encoder_buffers.end()) {
756  multi_threading_params.chunk_encoder_buffers[chunk_key] =
757  std::make_unique<ForeignStorageBuffer>();
758  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
759  column->columnType);
760  }
761  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder(),
762  column->columnType,
763  data_block,
764  result.row_count);
765  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
766  ->getEncoder()
767  ->getNumElems() +
768  result.row_count;
769  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
770  num_elements);
771  cache_blocks(
772  multi_threading_params.cached_chunks,
773  data_block,
774  result.row_count,
775  chunk_key,
776  column,
777  (num_elements - result.row_count) == 0, // Is the first block added to this chunk
778  multi_threading_params.disable_cache);
779  }
780 }
781 
787  ParseBufferRequest& request) {
788  std::unique_lock<std::mutex> completed_requests_queue_lock(
789  multi_threading_params.request_pool_mutex);
790  multi_threading_params.request_pool.emplace(std::move(request));
791  completed_requests_queue_lock.unlock();
792  multi_threading_params.request_pool_condition.notify_all();
793 }
794 
799 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
800  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
801  const TextFileBufferParser& parser) {
802  std::map<int, const ColumnDescriptor*> column_by_id{};
803  while (true) {
804  auto request_opt = get_next_scan_request(multi_threading_params);
805  if (!request_opt.has_value()) {
806  break;
807  }
808  auto& request = request_opt.value();
809  try {
810  if (column_by_id.empty()) {
811  for (const auto column : request.getColumns()) {
812  column_by_id[column->columnId] = column;
813  }
814  }
815  auto partitions = partition_by_fragment(
816  request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
817  request.begin_pos = 0;
818  size_t row_index = request.first_row_index;
819  for (const auto partition : partitions) {
820  request.process_row_count = partition;
821  for (const auto& import_buffer : request.import_buffers) {
822  if (import_buffer != nullptr) {
823  import_buffer->clear();
824  }
825  }
826  auto result = parser.parseBuffer(request, true);
827  int fragment_id = row_index / request.getMaxFragRows();
828  process_data_blocks(multi_threading_params,
829  fragment_id,
830  request,
831  result,
832  column_by_id,
833  fragment_id_to_file_regions_map);
834  row_index += result.row_count;
835  request.begin_pos = result.row_offsets.back() - request.file_offset;
836  }
837  } catch (...) {
838  // Re-add request to pool so we dont block any other threads
839  {
840  std::lock_guard<std::mutex> pending_requests_lock(
841  multi_threading_params.pending_requests_mutex);
842  multi_threading_params.continue_processing = false;
843  }
844  add_request_to_pool(multi_threading_params, request);
845  throw;
846  }
847  add_request_to_pool(multi_threading_params, request);
848  }
849 }
850 
855  MetadataScanMultiThreadingParams& multi_threading_params) {
856  std::unique_lock<std::mutex> request_pool_lock(
857  multi_threading_params.request_pool_mutex);
858  multi_threading_params.request_pool_condition.wait(
859  request_pool_lock,
860  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
861  auto request = std::move(multi_threading_params.request_pool.front());
862  multi_threading_params.request_pool.pop();
863  request_pool_lock.unlock();
864  CHECK(request.buffer);
865  return request;
866 }
867 
868 /*
869  * Defer processing a request until next iteration. The use case for this is
870  * during an iterative, some requests must defer processing until the correct
871  * fragment is being processed.
872  */
874  ParseBufferRequest& request) {
875  std::unique_lock<std::mutex> deferred_requests_lock(
876  multi_threading_params.deferred_requests_mutex);
877  multi_threading_params.deferred_requests.emplace(std::move(request));
878 }
879 
880 /*
881  * Dispatch all requests that are currently deferred onto the pending request queue.
882  */
884  MetadataScanMultiThreadingParams& multi_threading_params) {
885  std::unique_lock<std::mutex> deferred_requests_lock(
886  multi_threading_params.deferred_requests_mutex);
887  {
888  std::unique_lock<std::mutex> pending_requests_lock(
889  multi_threading_params.pending_requests_mutex);
890 
891  while (!multi_threading_params.deferred_requests.empty()) {
892  auto& request = multi_threading_params.deferred_requests.front();
893  multi_threading_params.pending_requests.emplace(std::move(request));
894  multi_threading_params.deferred_requests.pop();
895  }
896  multi_threading_params.pending_requests_condition.notify_all();
897  }
898 }
899 
905  ParseBufferRequest& request) {
906  {
907  std::unique_lock<std::mutex> pending_requests_lock(
908  multi_threading_params.pending_requests_mutex);
909  multi_threading_params.pending_requests.emplace(std::move(request));
910  }
911  multi_threading_params.pending_requests_condition.notify_all();
912 }
913 
919  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
922  std::map<int, const ColumnDescriptor*> column_by_id{};
923  while (true) {
924  auto request_opt = get_next_scan_request(multi_threading_params);
925  if (!request_opt.has_value()) {
926  break;
927  }
928  ParseBufferRequest& request = request_opt.value();
929  try {
930  if (column_by_id.empty()) {
931  for (const auto column : request.getColumns()) {
932  column_by_id[column->columnId] = column;
933  }
934  }
936  for (size_t num_rows_left_to_process =
937  request.buffer_row_count - request.processed_row_count;
938  num_rows_left_to_process > 0;
939  num_rows_left_to_process =
940  request.buffer_row_count - request.processed_row_count) {
941  // NOTE: `request.begin_pos` state is required to be set correctly by this point
942  // in execution
943  size_t row_index = request.first_row_index + request.processed_row_count;
944  int fragment_id = row_index / request.getMaxFragRows();
945  if (fragment_id >
946  file_scan_param.fragment_id) { // processing must continue next iteration
947  defer_scan_request(multi_threading_params, request);
948  return;
949  }
951  row_index, request.getMaxFragRows(), num_rows_left_to_process);
952  for (const auto& import_buffer : request.import_buffers) {
953  if (import_buffer != nullptr) {
954  import_buffer->clear();
955  }
956  }
957  auto result = parser.parseBuffer(request, true);
958  populate_chunks_using_data_blocks(multi_threading_params,
959  fragment_id,
960  request,
961  result,
962  column_by_id,
963  fragment_id_to_file_regions_map,
964  file_scan_param);
965  request.processed_row_count += result.row_count;
966  request.begin_pos = result.row_offsets.back() - request.file_offset;
967  }
968 
969  } catch (...) {
970  // Re-add request to pool so we dont block any other threads
971  {
972  std::lock_guard<std::mutex> pending_requests_lock(
973  multi_threading_params.pending_requests_mutex);
974  multi_threading_params.continue_processing = false;
975  }
976  add_request_to_pool(multi_threading_params, request);
977  throw;
978  }
979  add_request_to_pool(multi_threading_params, request);
980  }
981 }
982 
987 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
988  size_t& buffer_size,
989  const size_t alloc_size) {
990  CHECK_LE(buffer_size, alloc_size);
991  if (buffer_size < alloc_size) {
992  buffer = std::make_unique<char[]>(alloc_size);
993  buffer_size = alloc_size;
994  }
995 }
996 
998  foreign_storage::MetadataScanMultiThreadingParams& multi_threading_params) {
999  multi_threading_params.request_pool = {};
1000  multi_threading_params.cached_chunks = {};
1001  multi_threading_params.pending_requests = {};
1002  multi_threading_params.deferred_requests = {};
1003  multi_threading_params.chunk_encoder_buffers.clear();
1004 }
1005 
1011  const foreign_storage::ForeignTable* table,
1012  const size_t& buffer_size,
1013  const std::string& file_path,
1014  FileReader& file_reader,
1015  const import_export::CopyParams& copy_params,
1016  MetadataScanMultiThreadingParams& multi_threading_params,
1017  size_t& first_row_index_in_buffer,
1018  size_t& current_file_offset,
1020  const foreign_storage::IterativeFileScanParameters* file_scan_param,
1022  iterative_residual_buffer,
1023  const bool is_first_file_scan_call) {
1024  auto& alloc_size = iterative_residual_buffer.alloc_size;
1025  auto& residual_buffer = iterative_residual_buffer.residual_data;
1026  auto& residual_buffer_size = iterative_residual_buffer.residual_buffer_size;
1027  auto& residual_buffer_alloc_size = iterative_residual_buffer.residual_buffer_alloc_size;
1028 
1029  if (is_first_file_scan_call) {
1030  alloc_size = buffer_size;
1031  residual_buffer = std::make_unique<char[]>(alloc_size);
1032  residual_buffer_size = 0;
1033  residual_buffer_alloc_size = alloc_size;
1034  } else if (!no_deferred_requests(multi_threading_params)) {
1035  dispatch_all_deferred_requests(multi_threading_params);
1036  }
1037 
1038  while (!file_reader.isScanFinished()) {
1039  {
1040  std::lock_guard<std::mutex> pending_requests_lock(
1041  multi_threading_params.pending_requests_mutex);
1042  if (!multi_threading_params.continue_processing) {
1043  break;
1044  }
1045  }
1046  auto request = get_request_from_pool(multi_threading_params);
1047  request.full_path = file_reader.getCurrentFilePath();
1048  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
1049 
1050  if (residual_buffer_size > 0) {
1051  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
1052  }
1053  size_t size = residual_buffer_size;
1054  size += file_reader.read(request.buffer.get() + residual_buffer_size,
1055  alloc_size - residual_buffer_size);
1056 
1057  if (size == 0) {
1058  // In some cases at the end of a file we will read 0 bytes even when
1059  // file_reader.isScanFinished() is false. Also add request back to the pool to be
1060  // picked up again in the next iteration.
1061  add_request_to_pool(multi_threading_params, request);
1062  continue;
1063  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
1064  // In some cases files with newlines at the end will be encoded with a second
1065  // newline that can end up being the only thing in the buffer. Also add request
1066  // back to the pool to be picked up again in the next iteration.
1067  current_file_offset++;
1068  add_request_to_pool(multi_threading_params, request);
1069  continue;
1070  }
1071  unsigned int num_rows_in_buffer = 0;
1072  request.end_pos = parser.findRowEndPosition(alloc_size,
1073  request.buffer,
1074  size,
1075  copy_params,
1076  first_row_index_in_buffer,
1077  num_rows_in_buffer,
1078  &file_reader);
1079  request.buffer_size = size;
1080  request.buffer_alloc_size = alloc_size;
1081  request.first_row_index = first_row_index_in_buffer;
1082  request.file_offset = current_file_offset;
1083  request.buffer_row_count = num_rows_in_buffer;
1084  request.processed_row_count = 0;
1085  request.begin_pos = 0;
1086 
1087  residual_buffer_size = size - request.end_pos;
1088  if (residual_buffer_size > 0) {
1089  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
1090  memcpy(residual_buffer.get(),
1091  request.buffer.get() + request.end_pos,
1092  residual_buffer_size);
1093  }
1094 
1095  current_file_offset += request.end_pos;
1096  first_row_index_in_buffer += num_rows_in_buffer;
1097 
1098  if (num_rows_in_buffer > 0) {
1099  dispatch_scan_request(multi_threading_params, request);
1100  } else {
1101  add_request_to_pool(multi_threading_params, request);
1102  }
1103 
1104  if (file_scan_param) {
1105  const int32_t last_fragment_index =
1106  (first_row_index_in_buffer) / table->maxFragRows;
1107  if (last_fragment_index > file_scan_param->fragment_id) {
1108  break;
1109  }
1110  }
1111  }
1112 
1113  std::unique_lock<std::mutex> pending_requests_queue_lock(
1114  multi_threading_params.pending_requests_mutex);
1115  multi_threading_params.pending_requests_condition.wait(
1116  pending_requests_queue_lock, [&multi_threading_params] {
1117  return multi_threading_params.pending_requests.empty() ||
1118  (multi_threading_params.continue_processing == false);
1119  });
1120  multi_threading_params.continue_processing = false;
1121  pending_requests_queue_lock.unlock();
1122  multi_threading_params.pending_requests_condition.notify_all();
1123 }
1124 
1126  const foreign_storage::ForeignTable* table,
1127  const size_t& buffer_size,
1128  const std::string& file_path,
1129  FileReader& file_reader,
1130  const import_export::CopyParams& copy_params,
1131  MetadataScanMultiThreadingParams& multi_threading_params,
1132  size_t& first_row_index_in_buffer,
1133  size_t& current_file_offset,
1135  const foreign_storage::IterativeFileScanParameters* file_scan_param,
1137  iterative_residual_buffer,
1138  const bool is_first_file_scan_call) {
1139  try {
1140  dispatch_scan_requests(table,
1141  buffer_size,
1142  file_path,
1143  file_reader,
1144  copy_params,
1145  multi_threading_params,
1146  first_row_index_in_buffer,
1147  current_file_offset,
1148  parser,
1149  file_scan_param,
1150  iterative_residual_buffer,
1151  is_first_file_scan_call);
1152  } catch (...) {
1153  {
1154  std::unique_lock<std::mutex> pending_requests_lock(
1155  multi_threading_params.pending_requests_mutex);
1156  multi_threading_params.continue_processing = false;
1157  }
1158  multi_threading_params.pending_requests_condition.notify_all();
1159  throw;
1160  }
1161 }
1162 
1163 namespace {
1164 // Create metadata for unscanned columns
1165 // Any fragments with any updated rows between start_row and num_rows will be updated
1166 // Chunks prior to start_row will be restored from (ie for append
1167 // workflows)
1169  const ColumnDescriptor* column,
1170  const ForeignTable* foreign_table,
1171  const int db_id,
1172  const size_t start_row,
1173  const size_t total_num_rows,
1174  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
1175  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
1176  if (column->columnType.is_varlen_indeed()) {
1177  chunk_key.emplace_back(1);
1178  }
1179 
1180  // Create placeholder metadata for every fragment touched by this scan
1181  int start_fragment = start_row / foreign_table->maxFragRows;
1182  int end_fragment{0};
1183  if (total_num_rows > 0) {
1184  end_fragment = (total_num_rows - 1) / foreign_table->maxFragRows;
1185  }
1186  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
1187  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
1188  (fragment_id + 1)) > total_num_rows)
1189  ? total_num_rows % foreign_table->maxFragRows
1190  : foreign_table->maxFragRows;
1191 
1192  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
1193  chunk_metadata_map[chunk_key] =
1194  get_placeholder_metadata(column->columnType, num_elements);
1195  }
1196 }
1197 
1199  const std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map,
1200  const std::map<int, FileRegions>& fragment_id_to_file_regions_map,
1201  const foreign_storage::OptionsMap& server_options,
1202  std::unique_ptr<FileReader>& file_reader,
1203  const std::string& file_path,
1204  const import_export::CopyParams& copy_params,
1205  const shared::FilePathOptions& file_path_options,
1206  const std::optional<size_t>& max_file_count,
1207  const foreign_storage::ForeignTable* foreign_table,
1208  const foreign_storage::UserMapping* user_mapping,
1210  std::function<std::string()> get_s3_key,
1211  size_t& num_rows,
1212  size_t& append_start_offset) {
1213  // Should only be called once for non-append tables
1214  CHECK(chunk_metadata_map.empty());
1215  CHECK(fragment_id_to_file_regions_map.empty());
1217  ->second ==
1219  file_reader = std::make_unique<LocalMultiFileReader>(
1220  file_path, copy_params, file_path_options, max_file_count);
1221  } else {
1222  UNREACHABLE();
1223  }
1224  parser.validateFiles(file_reader.get(), foreign_table);
1225  num_rows = 0;
1226  append_start_offset = 0;
1227 }
1228 } // namespace
1229 
1243  ChunkMetadataVector& chunk_metadata_vector) {
1244  auto timer = DEBUG_TIMER(__func__);
1245 
1246  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1247  const auto file_path = getFullFilePath(foreign_table_);
1249  CHECK(catalog);
1250  auto& parser = getFileBufferParser();
1251  const auto file_path_options = getFilePathOptions(foreign_table_);
1252  auto& server_options = foreign_table_->foreign_server->options;
1253  std::set<std::string> rolled_off_files;
1254  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
1255  auto multi_file_reader = dynamic_cast<MultiFileReader*>(file_reader_.get());
1256  if (allowFileRollOff(foreign_table_) && multi_file_reader) {
1257  rolled_off_files = multi_file_reader->checkForRolledOffFiles(file_path_options);
1258  }
1259  parser.validateFiles(file_reader_.get(), foreign_table_);
1260  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1261  file_reader_->checkForMoreRows(append_start_offset_, file_path_options);
1262  } else {
1263  UNREACHABLE();
1264  }
1265  } else {
1269  server_options,
1270  file_reader_,
1271  file_path,
1272  copy_params,
1273  file_path_options,
1274  getMaxFileCount(),
1276  user_mapping_,
1277  parser,
1278  [this] { return ""; },
1279  num_rows_,
1281  }
1282 
1283  auto columns =
1284  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1285  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1286  for (auto column : columns) {
1287  column_by_id[column->columnId] = column;
1288  }
1289  MetadataScanMultiThreadingParams multi_threading_params;
1290  multi_threading_params.disable_cache = disable_cache_;
1291 
1292  // Restore previous chunk data
1293  if (foreign_table_->isAppendMode()) {
1294  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
1295  }
1296 
1297  std::set<int> columns_to_scan;
1298  for (auto column : columns) {
1299  if (!skip_metadata_scan(column)) {
1300  columns_to_scan.insert(column->columnId);
1301  }
1302  }
1303 
1304  // Track where scan started for appends
1305  int start_row = num_rows_;
1306  if (!file_reader_->isScanFinished()) {
1307  auto buffer_size = get_buffer_size(copy_params,
1308  file_reader_->isRemainingSizeKnown(),
1309  file_reader_->getRemainingSize());
1310  auto thread_count = get_thread_count(copy_params,
1311  file_reader_->isRemainingSizeKnown(),
1312  file_reader_->getRemainingSize(),
1313  buffer_size);
1314  multi_threading_params.continue_processing = true;
1315 
1316  std::vector<std::future<void>> futures{};
1317  for (size_t i = 0; i < thread_count; i++) {
1318  multi_threading_params.request_pool.emplace(buffer_size,
1319  copy_params,
1320  db_id_,
1322  columns_to_scan,
1324  nullptr,
1325  disable_cache_);
1326 
1327  futures.emplace_back(std::async(std::launch::async,
1328  scan_metadata,
1329  std::ref(multi_threading_params),
1331  std::ref(parser)));
1332  }
1333 
1334  ResidualBuffer residual_buffer;
1336  buffer_size,
1337  file_path,
1338  (*file_reader_),
1339  copy_params,
1340  multi_threading_params,
1341  num_rows_,
1344  nullptr,
1345  residual_buffer,
1346  true);
1347 
1348  for (auto& future : futures) {
1349  // get() instead of wait() because we need to propagate potential exceptions.
1350  future.get();
1351  }
1352  }
1353 
1354  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
1355  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
1356  CHECK(column_entry != column_by_id.end());
1357  const auto& column_type = column_entry->second->columnType;
1358  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
1359  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
1360  const auto& cached_chunks = multi_threading_params.cached_chunks;
1361  if (!column_type.is_varlen_indeed()) {
1362  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
1363  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
1364  chunk_entry != cached_chunks.end()) {
1365  auto buffer = chunk_entry->second.getBuffer();
1366  CHECK(buffer);
1367  chunk_metadata->numBytes = buffer->size();
1368  } else {
1369  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
1370  }
1371  chunk_metadata_map_[chunk_key] = chunk_metadata;
1372  }
1373 
1374  for (auto column : columns) {
1375  if (skip_metadata_scan(column)) {
1377  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
1378  }
1379  }
1380 
1381  if (!rolled_off_files.empty()) {
1382  updateRolledOffChunks(rolled_off_files, column_by_id);
1383  }
1384 
1385  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1386  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1387  }
1388 
1389  // Save chunk data
1390  if (foreign_table_->isAppendMode()) {
1391  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
1392  }
1393 }
1394 
1396  ChunkMetadataVector& chunk_metadata_vector,
1397  IterativeFileScanParameters& file_scan_param) {
1398  auto timer = DEBUG_TIMER(__func__);
1399 
1401 
1403  << " iterative file scan can not be used with APPEND mode.";
1404 
1405  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1406  const auto file_path = getFullFilePath(foreign_table_);
1408  CHECK(catalog);
1409  auto& parser = getFileBufferParser();
1410  const auto file_path_options = getFilePathOptions(foreign_table_);
1411  auto& server_options = foreign_table_->foreign_server->options;
1412 
1417  server_options,
1418  file_reader_,
1419  file_path,
1420  copy_params,
1421  file_path_options,
1422  getMaxFileCount(),
1424  user_mapping_,
1425  parser,
1426  [this] { return ""; },
1427  num_rows_,
1429  }
1430 
1431  auto columns =
1432  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1433  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1434  for (auto column : columns) {
1435  column_by_id[column->columnId] = column;
1436  }
1437 
1438  if (is_first_file_scan_call_) { // reiniitialize all members that may have state in
1439  // `multi_threading_params_`
1441  }
1442 
1444 
1445  std::set<int> columns_to_scan;
1446  for (auto column : columns) {
1447  columns_to_scan.insert(column->columnId);
1448  }
1449 
1452  // NOTE: `buffer_size_` and `thread_count_` must not change across an iterative
1453  // scan
1454  buffer_size_ = get_buffer_size(copy_params,
1455  file_reader_->isRemainingSizeKnown(),
1456  file_reader_->getRemainingSize());
1457  thread_count_ = get_thread_count(copy_params,
1458  file_reader_->isRemainingSizeKnown(),
1459  file_reader_->getRemainingSize(),
1460  buffer_size_);
1461  }
1463 
1464  std::vector<std::future<void>> futures{};
1465  for (size_t i = 0; i < thread_count_; i++) {
1468  copy_params,
1469  db_id_,
1471  columns_to_scan,
1474  true);
1475  }
1476  futures.emplace_back(std::async(std::launch::async,
1478  std::ref(multi_threading_params_),
1480  std::ref(parser),
1481  std::ref(file_scan_param)));
1482  }
1483 
1485  buffer_size_,
1486  file_path,
1487  (*file_reader_),
1488  copy_params,
1490  num_rows_,
1493  &file_scan_param,
1496 
1497  for (auto& future : futures) {
1498  // get() instead of wait() because we need to propagate potential exceptions.
1499  future.get();
1500  }
1501  }
1502 
1504  is_first_file_scan_call_ = false;
1505  }
1506 
1509  }
1510 }
1511 
1513  const std::set<std::string>& rolled_off_files,
1514  const std::map<int32_t, const ColumnDescriptor*>& column_by_id) {
1515  std::set<int32_t> deleted_fragment_ids;
1516  std::optional<int32_t> partially_deleted_fragment_id;
1517  std::optional<size_t> partially_deleted_fragment_row_count;
1518  for (auto& [fragment_id, file_regions] : fragment_id_to_file_regions_map_) {
1519  bool file_region_deleted{false};
1520  for (auto it = file_regions.begin(); it != file_regions.end();) {
1521  if (shared::contains(rolled_off_files, it->file_path)) {
1522  it = file_regions.erase(it);
1523  file_region_deleted = true;
1524  } else {
1525  it++;
1526  }
1527  }
1528  if (file_regions.empty()) {
1529  deleted_fragment_ids.emplace(fragment_id);
1530  } else if (file_region_deleted) {
1531  partially_deleted_fragment_id = fragment_id;
1532  partially_deleted_fragment_row_count = 0;
1533  for (const auto& file_region : file_regions) {
1534  partially_deleted_fragment_row_count.value() += file_region.row_count;
1535  }
1536  break;
1537  }
1538  }
1539 
1540  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1541  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
1542  chunk_metadata->numElements = 0;
1543  chunk_metadata->numBytes = 0;
1544  } else if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
1545  CHECK(partially_deleted_fragment_row_count.has_value());
1546  auto old_chunk_stats = chunk_metadata->chunkStats;
1547  auto cd = shared::get_from_map(column_by_id, chunk_key[CHUNK_KEY_COLUMN_IDX]);
1548  chunk_metadata = get_placeholder_metadata(
1549  cd->columnType, partially_deleted_fragment_row_count.value());
1550  // Old chunk stats will still be correct (since only row deletion is occurring)
1551  // and more accurate than that of the placeholder metadata.
1552  chunk_metadata->chunkStats = old_chunk_stats;
1553  }
1554  }
1555 }
1556 
1558  rapidjson::Document d;
1559  d.SetObject();
1560 
1561  // Save fragment map
1564  "fragment_id_to_file_regions_map",
1565  d.GetAllocator());
1566 
1567  // Save reader metadata
1568  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1569  file_reader_->serialize(reader_metadata, d.GetAllocator());
1570  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1571 
1572  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1574  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1575 
1576  return json_utils::write_to_string(d);
1577 }
1578 
1580  const std::string& file_path,
1581  const ChunkMetadataVector& chunk_metadata) {
1582  auto d = json_utils::read_from_file(file_path);
1583  CHECK(d.IsObject());
1584 
1585  // Restore fragment map
1587  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1588 
1589  // Construct reader with metadta
1590  CHECK(d.HasMember("reader_metadata"));
1591  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1592  const auto full_file_path = getFullFilePath(foreign_table_);
1593  auto& server_options = foreign_table_->foreign_server->options;
1594  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1595  file_reader_ = std::make_unique<LocalMultiFileReader>(
1596  full_file_path, copy_params, d["reader_metadata"]);
1597  } else {
1598  UNREACHABLE();
1599  }
1600 
1602  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1603 
1604  // Now restore the internal metadata maps
1605  CHECK(chunk_metadata_map_.empty());
1606  CHECK(chunk_encoder_buffers_.empty());
1607 
1608  for (auto& pair : chunk_metadata) {
1609  chunk_metadata_map_[pair.first] = pair.second;
1610 
1611  if (foreign_table_->isAppendMode()) {
1612  // Restore encoder state for append mode
1613  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1614  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1615  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1616  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1617  pair.second->numElements);
1618  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1619  pair.second->chunkStats);
1620  chunk_encoder_buffers_[pair.first]->setUpdated();
1621  }
1622  }
1623  is_restored_ = true;
1624 }
1625 
1627  return is_restored_;
1628 }
1629 
1630 // declared in three derived classes to avoid
1631 // polluting ForeignDataWrapper virtual base
1632 // @TODO refactor to lower class if needed
1634  // must have these
1635  CHECK_GE(db_id_, 0);
1637 
1638  // populate map for all poly columns in this table
1640  CHECK(catalog);
1641  auto columns =
1642  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1643  for (auto const& column : columns) {
1644  if (IS_GEO_POLY(column->columnType.get_type())) {
1646  .try_emplace(column->columnId,
1647  std::make_unique<import_export::RenderGroupAnalyzer>())
1648  .second);
1649  }
1650  }
1651 }
1652 
1653 std::optional<size_t> AbstractTextFileDataWrapper::getMaxFileCount() const {
1654  return {};
1655 }
1656 
1657 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool contains(const T &container, const U &element)
Definition: misc.h:195
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
virtual bool isScanFinished() const =0
std::vector< int > ChunkKey
Definition: types.h:36
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
virtual void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const =0
virtual size_t read(void *buffer, size_t max_size)=0
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:222
std::string tableName
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:223
void resize_delete_buffer(AbstractBuffer *delete_buffer, const size_t chunk_element_count)
std::condition_variable & getChunkConditionalVariable(const int col_id) const
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool is_varlen() const
Definition: sqltypes.h:615
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void updateRolledOffChunks(const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
virtual int8_t * getMemoryPtr()=0
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
void dispatch_scan_requests(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call)
#define UNREACHABLE()
Definition: Logger.h:266
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
#define CHECK_GE(x, y)
Definition: Logger.h:235
void reset_multithreading_params(foreign_storage::MetadataScanMultiThreadingParams &multi_threading_params)
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
size_t num_rows_to_process(const size_t start_row_index, const size_t max_fragment_size, const size_t rows_remaining)
virtual const TextFileBufferParser & getFileBufferParser() const =0
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
std::map< int, Chunk_NS::Chunk > & column_id_to_chunk_map
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call)
#define CHECK_GT(x, y)
Definition: Logger.h:234
bool no_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
std::string to_string(char const *&&v)
void iterativeFileScan(ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
std::vector< FileRegion > FileRegions
Definition: FileRegions.h:60
MetadataScanMultiThreadingParams multi_threading_params_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:172
bool key_does_not_shard_to_leaf(const ChunkKey &key)
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:341
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void throw_fragment_id_out_of_bounds_error(const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void populate_chunks(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser, foreign_storage::IterativeFileScanParameters &file_scan_param)
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
rapidjson::Document read_from_file(const std::string &file_path)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
virtual std::optional< size_t > getMaxFileCount() const
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
bool is_dict_encoded_type() const
Definition: sqltypes.h:639
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
void dispatch_all_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
specifies the content in-memory of a row in the column metadata table
void populate_chunks_using_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::IterativeFileScanParameters &file_scan_param)
std::list< const ColumnDescriptor * > getColumns() const
bool g_enable_smem_group_by true
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:157
bool isAppendMode() const
Checks if the table is in append mode.
std::mutex & getChunkMutex(const int col_id) const
void defer_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHECK_LE(x, y)
Definition: Logger.h:233
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
void append_data_block_to_chunk(const foreign_storage::IterativeFileScanParameters &file_scan_param, DataBlockPtr data_block, size_t row_count, const ChunkKey &chunk_key, const ColumnDescriptor *column, const std::set< size_t > &rejected_row_indices, const size_t element_count_required)
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
virtual size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
bool g_enable_watchdog false
Definition: Execute.cpp:79
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const ParseBufferResult &result, const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::map< std::string, std::string, std::less<>> OptionsMap
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
std::string write_to_string(const rapidjson::Document &document)
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
virtual ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const =0
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:621
std::optional< ParseBufferRequest > get_next_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
void throw_unexpected_number_of_items(const size_t &num_expected, const size_t &num_loaded, const std::string &item_type)
int8_t * numbersPtr
Definition: sqltypes.h:221
virtual std::set< std::string > checkForRolledOffFiles(const shared::FilePathOptions &file_path_options)
Definition: FileReader.cpp:615
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void dispatch_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool disable_cache)
bool is_array() const
Definition: sqltypes.h:583
virtual std::string getCurrentFilePath() const =0
#define IS_GEO_POLY(T)
Definition: sqltypes.h:303