OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AbstractTextFileDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 #include <queue>
23 
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
26 
32 #include "Shared/JsonUtils.h"
33 #include "Shared/misc.h"
34 
35 namespace foreign_storage {
37  : db_id_(-1)
38  , foreign_table_(nullptr)
39  , user_mapping_(nullptr)
40  , disable_cache_(false)
41  , is_first_file_scan_call_(true)
42  , is_file_scan_in_progress_(false)
43  , iterative_scan_last_fragment_id_(-1) {}
44 
46  const int db_id,
47  const ForeignTable* foreign_table)
48  : db_id_(db_id)
49  , foreign_table_(foreign_table)
50  , is_restored_(false)
51  , user_mapping_(nullptr)
52  , disable_cache_(false)
53  , is_first_file_scan_call_(true)
54  , is_file_scan_in_progress_(false)
55  , iterative_scan_last_fragment_id_(-1) {}
56 
58  const int db_id,
59  const ForeignTable* foreign_table,
60  const UserMapping* user_mapping,
61  const bool disable_cache)
62  : db_id_(db_id)
63  , foreign_table_(foreign_table)
64  , is_restored_(false)
65  , user_mapping_(user_mapping)
66  , disable_cache_(disable_cache)
67  , is_first_file_scan_call_(true)
68  , is_file_scan_in_progress_(false)
69  , iterative_scan_last_fragment_id_(-1) {}
70 
71 namespace {
72 
74  const int32_t fragment_id,
75  const int32_t max_fragment_id) {
77  "Attempting to populate fragment id " + std::to_string(fragment_id) +
78  " for foreign table " + table->tableName +
79  " which is greater than the maximum fragment id of " +
80  std::to_string(max_fragment_id) + "."};
81 }
82 
83 std::set<const ColumnDescriptor*> get_columns(const ChunkToBufferMap& buffers,
84  const Catalog_Namespace::Catalog& catalog,
85  const int32_t table_id,
86  const int fragment_id) {
87  CHECK(!buffers.empty());
88  std::set<const ColumnDescriptor*> columns;
89  for (const auto& entry : buffers) {
90  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
91  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
92  const auto column = catalog.getMetadataForColumn(table_id, column_id);
93  columns.emplace(column);
94  }
95  return columns;
96 }
97 
98 bool skip_metadata_scan(const ColumnDescriptor* column) {
99  return column->columnType.is_dict_encoded_type();
100 }
101 } // namespace
102 
104  const std::set<const ColumnDescriptor*>& columns,
105  const int fragment_id,
106  const ChunkToBufferMap& buffers,
107  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
108  for (const auto column : columns) {
109  ChunkKey data_chunk_key = {
110  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
111  init_chunk_for_column(data_chunk_key,
113  buffers,
114  column_id_to_chunk_map[column->columnId]);
115  }
116 }
117 
119  const ChunkToBufferMap& required_buffers,
120  const ChunkToBufferMap& optional_buffers,
121  AbstractBuffer* delete_buffer) {
122  auto timer = DEBUG_TIMER(__func__);
124  CHECK(catalog);
125  CHECK(!required_buffers.empty());
126 
127  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
128  auto required_columns =
129  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
130  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
132  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
133 
134  if (!optional_buffers.empty()) {
135  auto optional_columns =
136  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
138  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
139  }
140  populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
142  updateMetadata(column_id_to_chunk_map, fragment_id);
143  }
144 }
145 
146 // if column was skipped during scan, update metadata now
148  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
149  int fragment_id) {
151  CHECK(catalog);
152  for (auto& entry : column_id_to_chunk_map) {
153  const auto& column =
154  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
155  if (skip_metadata_scan(column)) {
156  ChunkKey data_chunk_key = {
157  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
158  if (column->columnType.is_varlen_indeed()) {
159  data_chunk_key.emplace_back(1);
160  }
161  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
162  // Allocate new shared_ptr for metadata so we dont modify old one which may be
163  // used by executor
164  auto cached_metadata_previous =
165  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
166  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
167  std::make_shared<ChunkMetadata>();
168  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
169  *cached_metadata = *cached_metadata_previous;
170  auto chunk_metadata =
171  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
172  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
173  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
174  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
175  cached_metadata->numBytes = entry.second.getBuffer()->size();
176  }
177  }
178 }
179 
185  size_t file_offset;
186  size_t row_count;
187  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
188  std::set<size_t> rejected_row_indices;
189 
190  bool operator<(const ParseFileRegionResult& other) const {
191  return file_offset < other.file_offset;
192  }
193 };
194 
195 namespace {
196 void throw_unexpected_number_of_items(const size_t num_expected,
197  const size_t num_loaded,
198  const std::string& item_type,
199  const std::string& foreign_table_name) {
200  try {
202  num_expected, num_loaded, item_type);
203  } catch (const foreign_storage::ForeignStorageException& except) {
205  std::string(except.what()) + " Foreign table: " + foreign_table_name);
206  }
207 }
208 
209 } // namespace
210 
216  const FileRegions& file_regions,
217  const size_t start_index,
218  const size_t end_index,
219  FileReader& file_reader,
220  ParseBufferRequest& parse_file_request,
221  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
222  const TextFileBufferParser& parser) {
223  auto timer = DEBUG_TIMER(__func__);
224  ParseFileRegionResult load_file_region_result{};
225  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
226  load_file_region_result.row_count = 0;
227 
229  for (size_t i = start_index; i <= end_index; i++) {
230  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
231  auto read_size = file_reader.readRegion(parse_file_request.buffer.get(),
232  file_regions[i].first_row_file_offset,
233  file_regions[i].region_size);
234  if (file_regions[i].region_size != read_size) {
235  throw_unexpected_number_of_items(file_regions[i].region_size,
236  read_size,
237  "bytes",
238  parse_file_request.getTableName());
239  }
240  parse_file_request.begin_pos = 0;
241  parse_file_request.end_pos = file_regions[i].region_size;
242  parse_file_request.first_row_index = file_regions[i].first_row_index;
243  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
244  parse_file_request.process_row_count = file_regions[i].row_count;
245 
246  result = parser.parseBuffer(parse_file_request, (i == end_index));
247  CHECK_EQ(file_regions[i].row_count, result.row_count);
248  for (const auto& rejected_row_index : result.rejected_rows) {
249  load_file_region_result.rejected_row_indices.insert(
250  load_file_region_result.row_count + rejected_row_index);
251  }
252  load_file_region_result.row_count += result.row_count;
253  }
254  load_file_region_result.column_id_to_data_blocks_map =
256  return load_file_region_result;
257 }
258 
259 namespace {
260 
264 size_t get_buffer_size(const import_export::CopyParams& copy_params,
265  const bool size_known,
266  const size_t file_size) {
267  size_t buffer_size = copy_params.buffer_size;
268  if (size_known && file_size < buffer_size) {
269  buffer_size = file_size + 1; // +1 for end of line character, if missing
270  }
271  return buffer_size;
272 }
273 
274 size_t get_buffer_size(const FileRegions& file_regions) {
275  size_t buffer_size = 0;
276  for (const auto& file_region : file_regions) {
277  buffer_size = std::max(buffer_size, file_region.region_size);
278  }
279  CHECK(buffer_size);
280  return buffer_size;
281 }
282 
287 size_t get_thread_count(const import_export::CopyParams& copy_params,
288  const bool size_known,
289  const size_t file_size,
290  const size_t buffer_size) {
291  size_t thread_count = copy_params.threads;
292  if (thread_count == 0) {
293  thread_count = std::thread::hardware_concurrency();
294  }
295  if (size_known && file_size > 0) {
296  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
297  if (num_buffers_in_file < thread_count) {
298  thread_count = num_buffers_in_file;
299  }
300  }
301  CHECK_GT(thread_count, static_cast<size_t>(0));
302  return thread_count;
303 }
304 
305 size_t get_thread_count(const import_export::CopyParams& copy_params,
306  const FileRegions& file_regions) {
307  size_t thread_count = copy_params.threads;
308  if (thread_count == 0) {
309  thread_count =
310  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
311  }
312  CHECK_GT(thread_count, static_cast<size_t>(0));
313  return thread_count;
314 }
315 
317  const size_t chunk_element_count) {
318  if (delete_buffer->size() < chunk_element_count) {
319  auto remaining_rows = chunk_element_count - delete_buffer->size();
320  std::vector<int8_t> data(remaining_rows, false);
321  delete_buffer->append(data.data(), remaining_rows);
322  }
323 }
324 
326  std::unique_lock<std::mutex> deferred_requests_lock(
327  multi_threading_params.deferred_requests_mutex);
328  return multi_threading_params.deferred_requests.empty();
329 }
330 
331 bool is_file_scan_finished(const FileReader* file_reader,
332  MetadataScanMultiThreadingParams& multi_threading_params) {
333  CHECK(file_reader);
334  return file_reader->isScanFinished() && no_deferred_requests(multi_threading_params);
335 }
336 
337 } // namespace
338 
340  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
341  int fragment_id,
342  AbstractBuffer* delete_buffer) {
343  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
344  CHECK(!column_id_to_chunk_map.empty());
345 
346  // check to see if a iterative scan step is required
347  auto file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
348  if (file_regions_it == fragment_id_to_file_regions_map_.end() ||
350  // check to see if there is more foreign data to scan
353  // NOTE: we can only guarantee the current `fragment_id` is fully done
354  // iterative scan if either
355  // 1) the scan is finished OR
356  // 2) `fragment_id+1` exists in the internal map
357  // this is why `fragment_id+1` is checked for below
358  auto file_regions_it_one_ahead =
359  fragment_id_to_file_regions_map_.find(fragment_id + 1);
361  (file_regions_it_one_ahead == fragment_id_to_file_regions_map_.end())) {
362  ChunkMetadataVector chunk_metadata_vector;
363  IterativeFileScanParameters iterative_params{
364  column_id_to_chunk_map, fragment_id, delete_buffer};
365  iterativeFileScan(chunk_metadata_vector, iterative_params);
366  }
367  }
368 
369  file_regions_it = fragment_id_to_file_regions_map_.find(fragment_id);
370  if (file_regions_it == fragment_id_to_file_regions_map_.end()) {
372  is_file_scan_in_progress_ = false; // conclude the iterative scan is finished
374  true; // any subsequent iterative request can assume they will be the first
376  foreign_table_, fragment_id, fragment_id_to_file_regions_map_.rbegin()->first);
377  } else {
378  // iterative scan is required to have loaded all required chunks thus we
379  // can exit early
380  return;
381  }
382  }
383  CHECK(file_regions_it != fragment_id_to_file_regions_map_.end());
384 
385  const auto& file_regions = file_regions_it->second;
386 
387  // File roll off can lead to empty file regions.
388  if (file_regions.empty()) {
389  return;
390  }
391 
392  const auto buffer_size = get_buffer_size(file_regions);
393  const auto thread_count = get_thread_count(copy_params, file_regions);
394 
395  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
396 
397  std::vector<ParseBufferRequest> parse_file_requests{};
398  parse_file_requests.reserve(thread_count);
399  std::set<int> column_filter_set;
400  for (const auto& pair : column_id_to_chunk_map) {
401  column_filter_set.insert(pair.first);
402  }
403 
404  std::vector<std::unique_ptr<FileReader>> file_readers;
405  rapidjson::Value reader_metadata(rapidjson::kObjectType);
406  rapidjson::Document d;
407  auto& server_options = foreign_table_->foreign_server->options;
409  file_reader_->serialize(reader_metadata, d.GetAllocator());
410  const auto file_path = getFullFilePath(foreign_table_);
411  auto& parser = getFileBufferParser();
412  std::vector<size_t> start_indices, end_indices;
413 
414  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
415  parse_file_requests.emplace_back(buffer_size,
416  copy_params,
417  db_id_,
419  column_filter_set,
420  file_path,
421  delete_buffer != nullptr);
422  auto start_index = i;
423  start_indices.emplace_back(start_index);
424  end_indices.emplace_back(
425  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1));
426 
427  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
428  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
429  file_path, copy_params, reader_metadata));
430  } else {
431  UNREACHABLE();
432  }
433  }
434 
435  CHECK_EQ(start_indices.size(), file_readers.size());
436  CHECK_EQ(start_indices.size(), parse_file_requests.size());
437  CHECK_EQ(start_indices.size(), end_indices.size());
438 
439  // All objects that futures depend on and/or that are passed by reference
440  // should be defined and/or instantiated before the loop that creates the
441  // futures (below) to avoid possible issues related to exception propagation
442  std::vector<std::future<ParseFileRegionResult>> futures{};
443  for (size_t i = 0; i < file_readers.size(); i++) {
444  futures.emplace_back(std::async(std::launch::async,
446  std::ref(file_regions),
447  start_indices[i],
448  end_indices[i],
449  std::ref(*(file_readers[i])),
450  std::ref(parse_file_requests[i]),
451  std::ref(column_id_to_chunk_map),
452  std::ref(parser)));
453  }
454 
455  for (auto& future : futures) {
456  future.wait();
457  }
458 
459  std::vector<ParseFileRegionResult> load_file_region_results{};
460  for (auto& future : futures) {
461  load_file_region_results.emplace_back(future.get());
462  }
463 
464  std::set<size_t> chunk_rejected_row_indices;
465  size_t chunk_offset = 0;
466  for (auto result : load_file_region_results) {
467  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
468  chunk.appendData(
469  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
470  }
471  for (const auto& rejected_row_index : result.rejected_row_indices) {
472  chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
473  }
474  chunk_offset += result.row_count;
475  }
476 
477  if (delete_buffer) {
478  // ensure delete buffer is sized appropriately
479  resize_delete_buffer(delete_buffer, chunk_offset);
480 
481  auto delete_buffer_data = delete_buffer->getMemoryPtr();
482  for (const auto rejected_row_index : chunk_rejected_row_indices) {
483  delete_buffer_data[rejected_row_index] = true;
484  }
485  }
486 }
487 
493 size_t num_rows_to_process(const size_t start_row_index,
494  const size_t max_fragment_size,
495  const size_t rows_remaining) {
496  size_t start_position_in_fragment = start_row_index % max_fragment_size;
497  return std::min<size_t>(rows_remaining, max_fragment_size - start_position_in_fragment);
498 }
499 
506 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
507  const size_t max_fragment_size,
508  const size_t buffer_row_count) {
509  CHECK(buffer_row_count > 0);
510  std::vector<size_t> partitions{};
511  size_t remaining_rows_in_last_fragment;
512  if (start_row_index % max_fragment_size == 0) {
513  remaining_rows_in_last_fragment = 0;
514  } else {
515  remaining_rows_in_last_fragment =
516  max_fragment_size - (start_row_index % max_fragment_size);
517  }
518  if (buffer_row_count <= remaining_rows_in_last_fragment) {
519  partitions.emplace_back(buffer_row_count);
520  } else {
521  if (remaining_rows_in_last_fragment > 0) {
522  partitions.emplace_back(remaining_rows_in_last_fragment);
523  }
524  size_t remaining_buffer_row_count =
525  buffer_row_count - remaining_rows_in_last_fragment;
526  while (remaining_buffer_row_count > 0) {
527  partitions.emplace_back(
528  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
529  remaining_buffer_row_count -= partitions.back();
530  }
531  }
532  return partitions;
533 }
534 
539 std::optional<ParseBufferRequest> get_next_scan_request(
540  MetadataScanMultiThreadingParams& multi_threading_params) {
541  std::unique_lock<std::mutex> pending_requests_lock(
542  multi_threading_params.pending_requests_mutex);
543  multi_threading_params.pending_requests_condition.wait(
544  pending_requests_lock, [&multi_threading_params] {
545  return !multi_threading_params.pending_requests.empty() ||
546  !multi_threading_params.continue_processing;
547  });
548  if (multi_threading_params.pending_requests.empty()) {
549  return {};
550  }
551  auto request = std::move(multi_threading_params.pending_requests.front());
552  multi_threading_params.pending_requests.pop();
553  pending_requests_lock.unlock();
554  multi_threading_params.pending_requests_condition.notify_all();
555  return std::move(request);
556 }
557 
562 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
563  int fragment_id,
564  size_t first_row_index,
565  const ParseBufferResult& result,
566  const std::string& file_path) {
567  fragment_id_to_file_regions_map[fragment_id].emplace_back(
568  // file naming is handled by FileReader
569  FileRegion(file_path,
570  result.row_offsets.front(),
571  first_row_index,
572  result.row_count,
573  result.row_offsets.back() - result.row_offsets.front()));
574 }
575 
580 void update_stats(Encoder* encoder,
581  const SQLTypeInfo& column_type,
582  DataBlockPtr data_block,
583  const size_t row_count) {
584  if (column_type.is_array()) {
585  encoder->updateStats(data_block.arraysPtr, 0, row_count);
586  } else if (!column_type.is_varlen()) {
587  encoder->updateStats(data_block.numbersPtr, row_count);
588  } else {
589  encoder->updateStats(data_block.stringsPtr, 0, row_count);
590  }
591 }
592 namespace {
594  std::shared_ptr<Catalog_Namespace::Catalog>& catalog,
595  const bool disable_cache) {
596  if (!disable_cache && catalog->getDataMgr()
597  .getPersistentStorageMgr()
598  ->getDiskCacheConfig()
599  .isEnabledForFSI()) {
600  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
601  } else {
602  return nullptr;
603  }
604 }
605 } // namespace
606 
607 // If cache is enabled, populate cached_chunks buffers with data blocks
608 void cache_blocks(std::map<ChunkKey, Chunk_NS::Chunk>& cached_chunks,
609  DataBlockPtr data_block,
610  size_t row_count,
611  ChunkKey& chunk_key,
612  const ColumnDescriptor* column,
613  bool is_first_block,
614  bool disable_cache) {
615  auto catalog =
617  CHECK(catalog);
618  auto cache = get_cache_if_enabled(catalog, disable_cache);
619  if (cache) {
620  // This extra filter needs to be here because this wrapper is the only one that
621  // accesses the cache directly and it should not be inserting chunks which are not
622  // mapped to the current leaf (in distributed mode).
623  if (key_does_not_shard_to_leaf(chunk_key)) {
624  return;
625  }
626 
627  ChunkKey index_key = {chunk_key[CHUNK_KEY_DB_IDX],
628  chunk_key[CHUNK_KEY_TABLE_IDX],
629  chunk_key[CHUNK_KEY_COLUMN_IDX],
630  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
631  2};
632  // Create actual data chunks to prepopulate cache
633  if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
634  cached_chunks[chunk_key] = Chunk_NS::Chunk{column, false};
635  cached_chunks[chunk_key].setBuffer(
636  cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
637  if (column->columnType.is_varlen_indeed()) {
638  cached_chunks[chunk_key].setIndexBuffer(
639  cache->getChunkBufferForPrecaching(index_key, is_first_block));
640  }
641  if (is_first_block) {
642  cached_chunks[chunk_key].initEncoder();
643  }
644  }
645  cached_chunks[chunk_key].appendData(data_block, row_count, 0);
646  }
647 }
648 
650  const foreign_storage::IterativeFileScanParameters& file_scan_param,
651  DataBlockPtr data_block,
652  size_t row_count,
653  const int column_id,
654  const ColumnDescriptor* column,
655  const size_t element_count_required) {
656  auto chunk = shared::get_from_map(file_scan_param.column_id_to_chunk_map, column_id);
657 
658  auto& conditional_variable = file_scan_param.getChunkConditionalVariable(column_id);
659  {
660  std::unique_lock<std::mutex> chunk_lock(file_scan_param.getChunkMutex(column_id));
661  conditional_variable.wait(chunk_lock, [element_count_required, &chunk]() {
662  return chunk.getBuffer()->getEncoder()->getNumElems() == element_count_required;
663  });
664 
665  chunk.appendData(data_block, row_count, 0);
666  }
667 
668  conditional_variable
669  .notify_all(); // notify any threads waiting on the correct element count
670 }
671 
679 std::pair<std::map<int, DataBlockPtr>, std::map<int, DataBlockPtr>> partition_data_blocks(
680  const std::map<int, const ColumnDescriptor*>& column_by_id,
681  const std::map<int, DataBlockPtr>& data_blocks) {
682  std::map<int, DataBlockPtr> dict_encoded_data_blocks;
683  std::map<int, DataBlockPtr> none_dict_encoded_data_blocks;
684  for (auto& [column_id, data_block] : data_blocks) {
685  const auto column = shared::get_from_map(column_by_id, column_id);
686  if (column->columnType.is_dict_encoded_string()) {
687  dict_encoded_data_blocks[column_id] = data_block;
688  } else {
689  none_dict_encoded_data_blocks[column_id] = data_block;
690  }
691  }
692  return {dict_encoded_data_blocks, none_dict_encoded_data_blocks};
693 }
694 
696  const ParseBufferRequest& request,
697  const ParseBufferResult& result,
698  const foreign_storage::IterativeFileScanParameters& file_scan_param,
699  const size_t start_position_in_fragment) {
700  if (file_scan_param.delete_buffer) {
701  std::unique_lock delete_buffer_lock(file_scan_param.delete_buffer_mutex);
702  auto& delete_buffer = file_scan_param.delete_buffer;
703  auto chunk_offset = start_position_in_fragment;
704  auto chunk_element_count = chunk_offset + request.processed_row_count;
705 
706  // ensure delete buffer is sized appropriately
707  resize_delete_buffer(delete_buffer, chunk_element_count);
708 
709  auto delete_buffer_data = delete_buffer->getMemoryPtr();
710  for (const auto rejected_row_index : result.rejected_rows) {
711  CHECK(rejected_row_index + chunk_offset < delete_buffer->size());
712  delete_buffer_data[rejected_row_index + chunk_offset] = true;
713  }
714  }
715 }
716 
718  MetadataScanMultiThreadingParams& multi_threading_params,
719  int fragment_id,
720  const ParseBufferRequest& request,
722  std::map<int, const ColumnDescriptor*>& column_by_id,
723  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
724  const foreign_storage::IterativeFileScanParameters& file_scan_param,
725  const size_t expected_current_element_count) {
726  std::unique_lock<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
727  // File regions should be added in same order as appendData
728  add_file_region(fragment_id_to_file_regions_map,
729  fragment_id,
730  request.first_row_index,
731  result,
732  request.getFilePath());
733  CHECK_EQ(fragment_id, file_scan_param.fragment_id);
734 
735  // start string encoding asynchronously
736  std::vector<std::pair<const size_t, std::future<int8_t*>>>
737  encoded_data_block_ptrs_futures;
738 
739  for (const auto& import_buffer : request.import_buffers) {
740  if (import_buffer == nullptr) {
741  continue;
742  }
743 
744  if (import_buffer->getTypeInfo().is_dict_encoded_string()) {
745  auto string_payload_ptr = import_buffer->getStringBuffer();
746  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
747 
748  auto column_id = import_buffer->getColumnDesc()->columnId;
749  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
750  column_id, std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
751  import_buffer->addDictEncodedString(*string_payload_ptr);
752  return import_buffer->getStringDictBuffer();
753  })));
754  }
755  }
756 
757  auto process_subset_of_data_blocks =
758  [&](const std::map<int, DataBlockPtr>& data_blocks) {
759  for (auto& [column_id, data_block] : data_blocks) {
760  const auto column = column_by_id[column_id];
761  lock.unlock(); // unlock the fragment based lock in order to achieve better
762  // performance
763  append_data_block_to_chunk(file_scan_param,
764  data_block,
765  result.row_count,
766  column_id,
767  column,
768  expected_current_element_count);
769  lock.lock();
770  }
771  };
772 
773  auto [dict_encoded_data_blocks, none_dict_encoded_data_blocks] =
775 
776  process_subset_of_data_blocks(
777  none_dict_encoded_data_blocks); // skip dict string columns
778 
779  // wait for the async requests we made for string dictionary
780  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
781  encoded_ptr_future.second.wait();
782  }
783  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
784  CHECK_GT(dict_encoded_data_blocks.count(encoded_ptr_future.first), 0UL);
785  dict_encoded_data_blocks[encoded_ptr_future.first].numbersPtr =
786  encoded_ptr_future.second.get();
787  }
788 
789  process_subset_of_data_blocks(
790  dict_encoded_data_blocks); // process only dict string columns
791 }
792 
799  int fragment_id,
800  const ParseBufferRequest& request,
802  std::map<int, const ColumnDescriptor*>& column_by_id,
803  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
804  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
805  // File regions should be added in same order as appendData
806  add_file_region(fragment_id_to_file_regions_map,
807  fragment_id,
808  request.first_row_index,
809  result,
810  request.getFilePath());
811 
812  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
813  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
814  const auto column = column_by_id[column_id];
815  if (column->columnType.is_varlen_indeed()) {
816  chunk_key.emplace_back(1);
817  }
818  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
819  multi_threading_params.chunk_encoder_buffers.end()) {
820  multi_threading_params.chunk_encoder_buffers[chunk_key] =
821  std::make_unique<ForeignStorageBuffer>();
822  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
823  column->columnType);
824  }
825  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder(),
826  column->columnType,
827  data_block,
828  result.row_count);
829  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
830  ->getEncoder()
831  ->getNumElems() +
832  result.row_count;
833  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
834  num_elements);
835  cache_blocks(
836  multi_threading_params.cached_chunks,
837  data_block,
838  result.row_count,
839  chunk_key,
840  column,
841  (num_elements - result.row_count) == 0, // Is the first block added to this chunk
842  multi_threading_params.disable_cache);
843  }
844 }
845 
851  ParseBufferRequest& request) {
852  std::unique_lock<std::mutex> completed_requests_queue_lock(
853  multi_threading_params.request_pool_mutex);
854  multi_threading_params.request_pool.emplace(std::move(request));
855  completed_requests_queue_lock.unlock();
856  multi_threading_params.request_pool_condition.notify_all();
857 }
858 
863 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
864  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
865  const TextFileBufferParser& parser) {
866  std::map<int, const ColumnDescriptor*> column_by_id{};
867  while (true) {
868  auto request_opt = get_next_scan_request(multi_threading_params);
869  if (!request_opt.has_value()) {
870  break;
871  }
872  auto& request = request_opt.value();
873  try {
874  if (column_by_id.empty()) {
875  for (const auto column : request.getColumns()) {
876  column_by_id[column->columnId] = column;
877  }
878  }
879  auto partitions = partition_by_fragment(
880  request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
881  request.begin_pos = 0;
882  size_t row_index = request.first_row_index;
883  for (const auto partition : partitions) {
884  request.process_row_count = partition;
885  for (const auto& import_buffer : request.import_buffers) {
886  if (import_buffer != nullptr) {
887  import_buffer->clear();
888  }
889  }
890  auto result = parser.parseBuffer(request, true);
891  int fragment_id = row_index / request.getMaxFragRows();
892  process_data_blocks(multi_threading_params,
893  fragment_id,
894  request,
895  result,
896  column_by_id,
897  fragment_id_to_file_regions_map);
898  row_index += result.row_count;
899  request.begin_pos = result.row_offsets.back() - request.file_offset;
900  }
901  } catch (...) {
902  // Re-add request to pool so we dont block any other threads
903  {
904  std::lock_guard<std::mutex> pending_requests_lock(
905  multi_threading_params.pending_requests_mutex);
906  multi_threading_params.continue_processing = false;
907  }
908  add_request_to_pool(multi_threading_params, request);
909  throw;
910  }
911  add_request_to_pool(multi_threading_params, request);
912  }
913 }
914 
919  MetadataScanMultiThreadingParams& multi_threading_params) {
920  std::unique_lock<std::mutex> request_pool_lock(
921  multi_threading_params.request_pool_mutex);
922  multi_threading_params.request_pool_condition.wait(
923  request_pool_lock,
924  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
925  auto request = std::move(multi_threading_params.request_pool.front());
926  multi_threading_params.request_pool.pop();
927  request_pool_lock.unlock();
928  CHECK(request.buffer);
929  return request;
930 }
931 
933  std::unique_lock<std::mutex> request_pool_lock(
934  multi_threading_params.request_pool_mutex);
935  return !multi_threading_params.request_pool.empty();
936 }
937 
938 /*
939  * Defer processing a request until next iteration. The use case for this is
940  * during an iterative file scan, some requests must defer processing until
941  * the correct fragment is being processed.
942  */
944  ParseBufferRequest& request) {
945  std::unique_lock<std::mutex> deferred_requests_lock(
946  multi_threading_params.deferred_requests_mutex);
947  multi_threading_params.deferred_requests.emplace(std::move(request));
948 }
949 
950 /*
951  * Dispatch all requests that are currently deferred onto the pending request queue.
952  */
954  MetadataScanMultiThreadingParams& multi_threading_params) {
955  std::unique_lock<std::mutex> deferred_requests_lock(
956  multi_threading_params.deferred_requests_mutex);
957  {
958  std::unique_lock<std::mutex> pending_requests_lock(
959  multi_threading_params.pending_requests_mutex);
960 
961  while (!multi_threading_params.deferred_requests.empty()) {
962  auto& request = multi_threading_params.deferred_requests.front();
963  multi_threading_params.pending_requests.emplace(std::move(request));
964  multi_threading_params.deferred_requests.pop();
965  }
966  multi_threading_params.pending_requests_condition.notify_all();
967  }
968 }
969 
975  ParseBufferRequest& request) {
976  {
977  std::unique_lock<std::mutex> pending_requests_lock(
978  multi_threading_params.pending_requests_mutex);
979  multi_threading_params.pending_requests.emplace(std::move(request));
980  }
981  multi_threading_params.pending_requests_condition.notify_all();
982 }
983 
989  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
992  std::map<int, const ColumnDescriptor*> column_by_id{};
993  while (true) {
994  auto request_opt = get_next_scan_request(multi_threading_params);
995  if (!request_opt.has_value()) {
996  break;
997  }
998  ParseBufferRequest& request = request_opt.value();
999  try {
1000  if (column_by_id.empty()) {
1001  for (const auto column : request.getColumns()) {
1002  column_by_id[column->columnId] = column;
1003  }
1004  }
1005  CHECK_LE(request.processed_row_count, request.buffer_row_count);
1006  for (size_t num_rows_left_to_process =
1007  request.buffer_row_count - request.processed_row_count;
1008  num_rows_left_to_process > 0;
1009  num_rows_left_to_process =
1010  request.buffer_row_count - request.processed_row_count) {
1011  // NOTE: `request.begin_pos` state is required to be set correctly by this point
1012  // in execution
1013  size_t row_index = request.first_row_index + request.processed_row_count;
1014  int fragment_id = row_index / request.getMaxFragRows();
1015  if (fragment_id >
1016  file_scan_param.fragment_id) { // processing must continue next iteration
1017  defer_scan_request(multi_threading_params, request);
1018  return;
1019  }
1021  row_index, request.getMaxFragRows(), num_rows_left_to_process);
1022  for (const auto& import_buffer : request.import_buffers) {
1023  if (import_buffer != nullptr) {
1024  import_buffer->clear();
1025  }
1026  }
1027  auto result = parser.parseBuffer(request, true, true, true);
1028  size_t start_position_in_fragment = row_index % request.getMaxFragRows();
1029  populate_chunks_using_data_blocks(multi_threading_params,
1030  fragment_id,
1031  request,
1032  result,
1033  column_by_id,
1034  fragment_id_to_file_regions_map,
1035  file_scan_param,
1036  start_position_in_fragment);
1037 
1038  request.processed_row_count += result.row_count;
1039  request.begin_pos = result.row_offsets.back() - request.file_offset;
1040 
1042  request, result, file_scan_param, start_position_in_fragment);
1043  }
1044 
1045  } catch (...) {
1046  // Re-add request to pool so we dont block any other threads
1047  {
1048  std::lock_guard<std::mutex> pending_requests_lock(
1049  multi_threading_params.pending_requests_mutex);
1050  multi_threading_params.continue_processing = false;
1051  }
1052  add_request_to_pool(multi_threading_params, request);
1053  throw;
1054  }
1055  add_request_to_pool(multi_threading_params, request);
1056  }
1057 }
1058 
1063 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
1064  size_t& buffer_size,
1065  const size_t alloc_size) {
1066  CHECK_LE(buffer_size, alloc_size);
1067  if (buffer_size < alloc_size) {
1068  buffer = std::make_unique<char[]>(alloc_size);
1069  buffer_size = alloc_size;
1070  }
1071 }
1072 
1074  foreign_storage::MetadataScanMultiThreadingParams& multi_threading_params) {
1075  multi_threading_params.request_pool = {};
1076  multi_threading_params.cached_chunks = {};
1077  multi_threading_params.pending_requests = {};
1078  multi_threading_params.deferred_requests = {};
1079  multi_threading_params.chunk_encoder_buffers.clear();
1080 }
1081 
1087  const foreign_storage::ForeignTable* table,
1088  const size_t& buffer_size,
1089  const std::string& file_path,
1090  FileReader& file_reader,
1091  const import_export::CopyParams& copy_params,
1092  MetadataScanMultiThreadingParams& multi_threading_params,
1093  size_t& first_row_index_in_buffer,
1094  size_t& current_file_offset,
1096  const foreign_storage::IterativeFileScanParameters* file_scan_param,
1098  iterative_residual_buffer,
1099  const bool is_first_file_scan_call,
1100  int& iterative_scan_last_fragment_id) {
1101  auto& alloc_size = iterative_residual_buffer.alloc_size;
1102  auto& residual_buffer = iterative_residual_buffer.residual_data;
1103  auto& residual_buffer_size = iterative_residual_buffer.residual_buffer_size;
1104  auto& residual_buffer_alloc_size = iterative_residual_buffer.residual_buffer_alloc_size;
1105 
1106  if (is_first_file_scan_call) {
1107  alloc_size = buffer_size;
1108  residual_buffer = std::make_unique<char[]>(alloc_size);
1109  residual_buffer_size = 0;
1110  residual_buffer_alloc_size = alloc_size;
1111  } else if (!no_deferred_requests(multi_threading_params)) {
1112  dispatch_all_deferred_requests(multi_threading_params);
1113  }
1114 
1115  // NOTE: During an interactive scan, it is possible for an entire fragment to
1116  // be parsed into requests, which sit in deferred requests; in order to avoid
1117  // stalling indefinitely while waiting on an available requests, the check
1118  // below determines if this is the case or not.
1119  bool current_fragment_fully_read_during_iterative_scan =
1120  file_scan_param && file_scan_param->fragment_id < iterative_scan_last_fragment_id;
1121 
1122  // NOTE: The conditional below behaves as follows:
1123  //
1124  // * for non-iterative scans,
1125  // current_fragment_fully_read_during_iterative_scan is false, and therefore
1126  // only the first clause of the conditional is relevant
1127  //
1128  // * for interactive scans, if
1129  // current_fragment_fully_read_during_iterative_scan is true, then the
1130  // conditional is skipped, unless it is determined there are still available
1131  // requests to work with, in which case the loop is entered; this is an
1132  // optimization that ensures maximal concurrency of loading while processing
1133  // requests
1134  while (!file_reader.isScanFinished() &&
1135  (request_pool_non_empty(multi_threading_params) ||
1136  !current_fragment_fully_read_during_iterative_scan)) {
1137  {
1138  std::lock_guard<std::mutex> pending_requests_lock(
1139  multi_threading_params.pending_requests_mutex);
1140  if (!multi_threading_params.continue_processing) {
1141  break;
1142  }
1143  }
1144  auto request = get_request_from_pool(multi_threading_params);
1145  request.full_path = file_reader.getCurrentFilePath();
1146  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
1147 
1148  if (residual_buffer_size > 0) {
1149  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
1150  }
1151  size_t size = residual_buffer_size;
1152  size += file_reader.read(request.buffer.get() + residual_buffer_size,
1153  alloc_size - residual_buffer_size);
1154 
1155  if (size == 0) {
1156  // In some cases at the end of a file we will read 0 bytes even when
1157  // file_reader.isScanFinished() is false. Also add request back to the pool to be
1158  // picked up again in the next iteration.
1159  add_request_to_pool(multi_threading_params, request);
1160  continue;
1161  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
1162  // In some cases files with newlines at the end will be encoded with a second
1163  // newline that can end up being the only thing in the buffer. Also add request
1164  // back to the pool to be picked up again in the next iteration.
1165  current_file_offset++;
1166  add_request_to_pool(multi_threading_params, request);
1167  continue;
1168  }
1169  unsigned int num_rows_in_buffer = 0;
1170  request.end_pos = parser.findRowEndPosition(alloc_size,
1171  request.buffer,
1172  size,
1173  copy_params,
1174  first_row_index_in_buffer,
1175  num_rows_in_buffer,
1176  &file_reader);
1177  request.buffer_size = size;
1178  request.buffer_alloc_size = alloc_size;
1179  request.first_row_index = first_row_index_in_buffer;
1180  request.file_offset = current_file_offset;
1181  request.buffer_row_count = num_rows_in_buffer;
1182  request.processed_row_count = 0;
1183  request.begin_pos = 0;
1184 
1185  residual_buffer_size = size - request.end_pos;
1186  if (residual_buffer_size > 0) {
1187  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
1188  memcpy(residual_buffer.get(),
1189  request.buffer.get() + request.end_pos,
1190  residual_buffer_size);
1191  }
1192 
1193  current_file_offset += request.end_pos;
1194  first_row_index_in_buffer += num_rows_in_buffer;
1195 
1196  if (num_rows_in_buffer > 0) {
1197  dispatch_scan_request(multi_threading_params, request);
1198  } else {
1199  add_request_to_pool(multi_threading_params, request);
1200  }
1201 
1202  if (file_scan_param) {
1203  const int32_t last_fragment_index =
1204  (first_row_index_in_buffer) / table->maxFragRows;
1205  if (last_fragment_index > file_scan_param->fragment_id) {
1206  iterative_scan_last_fragment_id = last_fragment_index;
1207  break;
1208  }
1209  }
1210  }
1211 
1212  std::unique_lock<std::mutex> pending_requests_queue_lock(
1213  multi_threading_params.pending_requests_mutex);
1214  multi_threading_params.pending_requests_condition.wait(
1215  pending_requests_queue_lock, [&multi_threading_params] {
1216  return multi_threading_params.pending_requests.empty() ||
1217  (multi_threading_params.continue_processing == false);
1218  });
1219  multi_threading_params.continue_processing = false;
1220  pending_requests_queue_lock.unlock();
1221  multi_threading_params.pending_requests_condition.notify_all();
1222 }
1223 
1225  const foreign_storage::ForeignTable* table,
1226  const size_t& buffer_size,
1227  const std::string& file_path,
1228  FileReader& file_reader,
1229  const import_export::CopyParams& copy_params,
1230  MetadataScanMultiThreadingParams& multi_threading_params,
1231  size_t& first_row_index_in_buffer,
1232  size_t& current_file_offset,
1234  const foreign_storage::IterativeFileScanParameters* file_scan_param,
1236  iterative_residual_buffer,
1237  const bool is_first_file_scan_call,
1238  int& iterative_scan_last_fragment_id) {
1239  try {
1240  dispatch_scan_requests(table,
1241  buffer_size,
1242  file_path,
1243  file_reader,
1244  copy_params,
1245  multi_threading_params,
1246  first_row_index_in_buffer,
1247  current_file_offset,
1248  parser,
1249  file_scan_param,
1250  iterative_residual_buffer,
1251  is_first_file_scan_call,
1252  iterative_scan_last_fragment_id);
1253  } catch (...) {
1254  {
1255  std::unique_lock<std::mutex> pending_requests_lock(
1256  multi_threading_params.pending_requests_mutex);
1257  multi_threading_params.continue_processing = false;
1258  }
1259  multi_threading_params.pending_requests_condition.notify_all();
1260  throw;
1261  }
1262 }
1263 
1265  const foreign_storage::ForeignTable* table,
1266  const size_t& buffer_size,
1267  const std::string& file_path,
1268  FileReader& file_reader,
1269  const import_export::CopyParams& copy_params,
1270  MetadataScanMultiThreadingParams& multi_threading_params,
1271  size_t& first_row_index_in_buffer,
1272  size_t& current_file_offset,
1274  const foreign_storage::IterativeFileScanParameters* file_scan_param,
1276  iterative_residual_buffer,
1277  const bool is_first_file_scan_call) {
1278  int dummy;
1280  buffer_size,
1281  file_path,
1282  file_reader,
1283  copy_params,
1284  multi_threading_params,
1285  first_row_index_in_buffer,
1286  current_file_offset,
1287  parser,
1288  file_scan_param,
1289  iterative_residual_buffer,
1290  is_first_file_scan_call,
1291  dummy);
1292 }
1293 
1294 namespace {
1295 // Create metadata for unscanned columns
1296 // Any fragments with any updated rows between start_row and num_rows will be updated
1297 // Chunks prior to start_row will be restored from (ie for append
1298 // workflows)
1300  const ColumnDescriptor* column,
1301  const ForeignTable* foreign_table,
1302  const int db_id,
1303  const size_t start_row,
1304  const size_t total_num_rows,
1305  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
1306  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
1307  if (column->columnType.is_varlen_indeed()) {
1308  chunk_key.emplace_back(1);
1309  }
1310 
1311  // Create placeholder metadata for every fragment touched by this scan
1312  int start_fragment = start_row / foreign_table->maxFragRows;
1313  int end_fragment{0};
1314  if (total_num_rows > 0) {
1315  end_fragment = (total_num_rows - 1) / foreign_table->maxFragRows;
1316  }
1317  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
1318  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
1319  (fragment_id + 1)) > total_num_rows)
1320  ? total_num_rows % foreign_table->maxFragRows
1321  : foreign_table->maxFragRows;
1322 
1323  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
1324  chunk_metadata_map[chunk_key] =
1325  get_placeholder_metadata(column->columnType, num_elements);
1326  }
1327 }
1328 
1330  const std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map,
1331  const std::map<int, FileRegions>& fragment_id_to_file_regions_map,
1332  const foreign_storage::OptionsMap& server_options,
1333  std::unique_ptr<FileReader>& file_reader,
1334  const std::string& file_path,
1335  const import_export::CopyParams& copy_params,
1336  const shared::FilePathOptions& file_path_options,
1337  const std::optional<size_t>& max_file_count,
1338  const foreign_storage::ForeignTable* foreign_table,
1339  const foreign_storage::UserMapping* user_mapping,
1341  std::function<std::string()> get_s3_key,
1342  size_t& num_rows,
1343  size_t& append_start_offset) {
1344  // Should only be called once for non-append tables
1345  CHECK(chunk_metadata_map.empty());
1346  CHECK(fragment_id_to_file_regions_map.empty());
1348  ->second ==
1350  file_reader = std::make_unique<LocalMultiFileReader>(
1351  file_path, copy_params, file_path_options, max_file_count);
1352  } else {
1353  UNREACHABLE();
1354  }
1355  parser.validateFiles(file_reader.get(), foreign_table);
1356  num_rows = 0;
1357  append_start_offset = 0;
1358 }
1359 } // namespace
1360 
1374  ChunkMetadataVector& chunk_metadata_vector) {
1375  auto timer = DEBUG_TIMER(__func__);
1376 
1377  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1378  const auto file_path = getFullFilePath(foreign_table_);
1380  CHECK(catalog);
1381  auto& parser = getFileBufferParser();
1382  const auto file_path_options = getFilePathOptions(foreign_table_);
1383  auto& server_options = foreign_table_->foreign_server->options;
1384  std::set<std::string> rolled_off_files;
1385  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
1386  auto multi_file_reader = dynamic_cast<MultiFileReader*>(file_reader_.get());
1387  if (allowFileRollOff(foreign_table_) && multi_file_reader) {
1388  rolled_off_files = multi_file_reader->checkForRolledOffFiles(file_path_options);
1389  }
1390  parser.validateFiles(file_reader_.get(), foreign_table_);
1391  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1392  file_reader_->checkForMoreRows(append_start_offset_, file_path_options);
1393  } else {
1394  UNREACHABLE();
1395  }
1396  } else {
1400  server_options,
1401  file_reader_,
1402  file_path,
1403  copy_params,
1404  file_path_options,
1405  getMaxFileCount(),
1407  user_mapping_,
1408  parser,
1409  [] { return ""; },
1410  num_rows_,
1412  }
1413 
1414  auto columns =
1415  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1416  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1417  for (auto column : columns) {
1418  column_by_id[column->columnId] = column;
1419  }
1420  MetadataScanMultiThreadingParams multi_threading_params;
1421  multi_threading_params.disable_cache = disable_cache_;
1422 
1423  // Restore previous chunk data
1424  if (foreign_table_->isAppendMode()) {
1425  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
1426  }
1427 
1428  std::set<int> columns_to_scan;
1429  for (auto column : columns) {
1430  if (!skip_metadata_scan(column)) {
1431  columns_to_scan.insert(column->columnId);
1432  }
1433  }
1434 
1435  // Track where scan started for appends
1436  int start_row = num_rows_;
1438  if (!file_reader_->isScanFinished()) {
1439  auto buffer_size = get_buffer_size(copy_params,
1440  file_reader_->isRemainingSizeKnown(),
1441  file_reader_->getRemainingSize());
1442  auto thread_count = get_thread_count(copy_params,
1443  file_reader_->isRemainingSizeKnown(),
1444  file_reader_->getRemainingSize(),
1445  buffer_size);
1446  multi_threading_params.continue_processing = true;
1447 
1448  std::vector<std::future<void>> futures{};
1449  for (size_t i = 0; i < thread_count; i++) {
1450  multi_threading_params.request_pool.emplace(buffer_size,
1451  copy_params,
1452  db_id_,
1454  columns_to_scan,
1456  disable_cache_);
1457 
1458  futures.emplace_back(std::async(std::launch::async,
1459  scan_metadata,
1460  std::ref(multi_threading_params),
1462  std::ref(parser)));
1463  }
1464 
1465  ResidualBuffer residual_buffer;
1467  buffer_size,
1468  file_path,
1469  (*file_reader_),
1470  copy_params,
1471  multi_threading_params,
1472  num_rows_,
1475  nullptr,
1476  residual_buffer,
1477  true);
1478 
1479  for (auto& future : futures) {
1480  // get() instead of wait() because we need to propagate potential exceptions.
1481  future.get();
1482  }
1483  }
1484 
1485  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
1486  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
1487  CHECK(column_entry != column_by_id.end());
1488  const auto& column_type = column_entry->second->columnType;
1489  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
1490  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
1491  const auto& cached_chunks = multi_threading_params.cached_chunks;
1492  if (!column_type.is_varlen_indeed()) {
1493  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
1494  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
1495  chunk_entry != cached_chunks.end()) {
1496  auto cached_buffer = chunk_entry->second.getBuffer();
1497  CHECK(cached_buffer);
1498  chunk_metadata->numBytes = cached_buffer->size();
1499  buffer->setSize(cached_buffer->size());
1500  } else {
1501  chunk_metadata->numBytes = buffer->size();
1502  }
1503  chunk_metadata_map_[chunk_key] = chunk_metadata;
1504  }
1505 
1506  for (auto column : columns) {
1507  if (skip_metadata_scan(column)) {
1509  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
1510  }
1511  }
1512 
1513  if (!rolled_off_files.empty()) {
1514  updateRolledOffChunks(rolled_off_files, column_by_id);
1515  }
1516 
1517  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1518  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
1519  }
1520 
1521  // Save chunk data
1522  if (foreign_table_->isAppendMode()) {
1523  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
1524  }
1525 }
1526 
1528  ChunkMetadataVector& chunk_metadata_vector,
1529  IterativeFileScanParameters& file_scan_param) {
1530  auto timer = DEBUG_TIMER(__func__);
1531 
1533 
1535  << " iterative file scan can not be used with APPEND mode.";
1536 
1537  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1538  const auto file_path = getFullFilePath(foreign_table_);
1540  CHECK(catalog);
1541  auto& parser = getFileBufferParser();
1542  const auto file_path_options = getFilePathOptions(foreign_table_);
1543  auto& server_options = foreign_table_->foreign_server->options;
1544 
1550  server_options,
1551  file_reader_,
1552  file_path,
1553  copy_params,
1554  file_path_options,
1555  getMaxFileCount(),
1557  user_mapping_,
1558  parser,
1559  [] { return ""; },
1560  num_rows_,
1562  }
1563 
1564  auto columns =
1565  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1566  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
1567  for (auto column : columns) {
1568  column_by_id[column->columnId] = column;
1569  }
1570 
1571  if (is_first_file_scan_call_) { // reiniitialize all members that may have state in
1572  // `multi_threading_params_`
1574  }
1575 
1577 
1578  std::set<int> columns_to_scan;
1579  for (auto column : columns) {
1580  columns_to_scan.insert(column->columnId);
1581  }
1582 
1585  // NOTE: `buffer_size_` and `thread_count_` must not change across an iterative
1586  // scan
1587  buffer_size_ = get_buffer_size(copy_params,
1588  file_reader_->isRemainingSizeKnown(),
1589  file_reader_->getRemainingSize());
1590  thread_count_ = get_thread_count(copy_params,
1591  file_reader_->isRemainingSizeKnown(),
1592  file_reader_->getRemainingSize(),
1593  buffer_size_);
1594  }
1596 
1597  std::vector<std::future<void>> futures{};
1598  for (size_t i = 0; i < thread_count_; i++) {
1601  copy_params,
1602  db_id_,
1604  columns_to_scan,
1606  true);
1607  }
1608  futures.emplace_back(std::async(std::launch::async,
1610  std::ref(multi_threading_params_),
1612  std::ref(parser),
1613  std::ref(file_scan_param)));
1614  }
1615 
1617  buffer_size_,
1618  file_path,
1619  (*file_reader_),
1620  copy_params,
1622  num_rows_,
1625  &file_scan_param,
1629 
1630  for (auto& future : futures) {
1631  // get() instead of wait() because we need to propagate potential exceptions.
1632  future.get();
1633  }
1634  }
1635 
1637  is_first_file_scan_call_ = false;
1638  }
1639 
1642  }
1643 }
1644 
1646  const std::set<std::string>& rolled_off_files,
1647  const std::map<int32_t, const ColumnDescriptor*>& column_by_id) {
1648  std::set<int32_t> deleted_fragment_ids;
1649  std::optional<int32_t> partially_deleted_fragment_id;
1650  std::optional<size_t> partially_deleted_fragment_row_count;
1651  for (auto& [fragment_id, file_regions] : fragment_id_to_file_regions_map_) {
1652  bool file_region_deleted{false};
1653  for (auto it = file_regions.begin(); it != file_regions.end();) {
1654  if (shared::contains(rolled_off_files, it->file_path)) {
1655  it = file_regions.erase(it);
1656  file_region_deleted = true;
1657  } else {
1658  it++;
1659  }
1660  }
1661  if (file_regions.empty()) {
1662  deleted_fragment_ids.emplace(fragment_id);
1663  } else if (file_region_deleted) {
1664  partially_deleted_fragment_id = fragment_id;
1665  partially_deleted_fragment_row_count = 0;
1666  for (const auto& file_region : file_regions) {
1667  partially_deleted_fragment_row_count.value() += file_region.row_count;
1668  }
1669  break;
1670  }
1671  }
1672 
1673  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
1674  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
1675  chunk_metadata->numElements = 0;
1676  chunk_metadata->numBytes = 0;
1677  } else if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
1678  CHECK(partially_deleted_fragment_row_count.has_value());
1679  auto old_chunk_stats = chunk_metadata->chunkStats;
1680  auto cd = shared::get_from_map(column_by_id, chunk_key[CHUNK_KEY_COLUMN_IDX]);
1681  chunk_metadata = get_placeholder_metadata(
1682  cd->columnType, partially_deleted_fragment_row_count.value());
1683  // Old chunk stats will still be correct (since only row deletion is occurring)
1684  // and more accurate than that of the placeholder metadata.
1685  chunk_metadata->chunkStats = old_chunk_stats;
1686  }
1687  }
1688 }
1689 
1691  rapidjson::Document d;
1692  d.SetObject();
1693 
1694  // Save fragment map
1697  "fragment_id_to_file_regions_map",
1698  d.GetAllocator());
1699 
1700  // Save reader metadata
1701  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1702  file_reader_->serialize(reader_metadata, d.GetAllocator());
1703  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1704 
1705  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1707  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1708 
1709  return json_utils::write_to_string(d);
1710 }
1711 
1713  const std::string& file_path,
1714  const ChunkMetadataVector& chunk_metadata) {
1715  auto d = json_utils::read_from_file(file_path);
1716  CHECK(d.IsObject());
1717 
1718  // Restore fragment map
1720  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1721 
1722  // Construct reader with metadta
1723  CHECK(d.HasMember("reader_metadata"));
1724  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1725  const auto full_file_path = getFullFilePath(foreign_table_);
1726  auto& server_options = foreign_table_->foreign_server->options;
1727  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1728  file_reader_ = std::make_unique<LocalMultiFileReader>(
1729  full_file_path, copy_params, d["reader_metadata"]);
1730  } else {
1731  UNREACHABLE();
1732  }
1733 
1735  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1736 
1737  // Now restore the internal metadata maps
1738  CHECK(chunk_metadata_map_.empty());
1739  CHECK(chunk_encoder_buffers_.empty());
1740 
1741  for (auto& pair : chunk_metadata) {
1742  chunk_metadata_map_[pair.first] = pair.second;
1743 
1744  if (foreign_table_->isAppendMode()) {
1745  // Restore encoder state for append mode
1746  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1747  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1748  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1749  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1750  pair.second->numElements);
1751  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1752  pair.second->chunkStats);
1753  chunk_encoder_buffers_[pair.first]->setUpdated();
1754  }
1755  }
1756  is_restored_ = true;
1757 }
1758 
1760  return is_restored_;
1761 }
1762 
1763 std::optional<size_t> AbstractTextFileDataWrapper::getMaxFileCount() const {
1764  return {};
1765 }
1766 
1767 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool contains(const T &container, const U &element)
Definition: misc.h:195
void append_data_block_to_chunk(const foreign_storage::IterativeFileScanParameters &file_scan_param, DataBlockPtr data_block, size_t row_count, const int column_id, const ColumnDescriptor *column, const size_t element_count_required)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
virtual bool isScanFinished() const =0
std::vector< int > ChunkKey
Definition: types.h:36
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
virtual void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const =0
virtual size_t read(void *buffer, size_t max_size)=0
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:234
std::string tableName
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:235
void resize_delete_buffer(AbstractBuffer *delete_buffer, const size_t chunk_element_count)
std::pair< std::map< int, DataBlockPtr >, std::map< int, DataBlockPtr > > partition_data_blocks(const std::map< int, const ColumnDescriptor * > &column_by_id, const std::map< int, DataBlockPtr > &data_blocks)
void populate_chunks_using_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::IterativeFileScanParameters &file_scan_param, const size_t expected_current_element_count)
std::condition_variable & getChunkConditionalVariable(const int col_id) const
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool is_varlen() const
Definition: sqltypes.h:629
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void updateRolledOffChunks(const std::set< std::string > &rolled_off_files, const std::map< int32_t, const ColumnDescriptor * > &column_by_id)
virtual ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const =0
virtual int8_t * getMemoryPtr()=0
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: JsonUtils.h:270
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
#define UNREACHABLE()
Definition: Logger.h:338
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void reset_multithreading_params(foreign_storage::MetadataScanMultiThreadingParams &multi_threading_params)
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
size_t num_rows_to_process(const size_t start_row_index, const size_t max_fragment_size, const size_t rows_remaining)
virtual const TextFileBufferParser & getFileBufferParser() const =0
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
std::map< int, Chunk_NS::Chunk > & column_id_to_chunk_map
void dispatch_scan_requests(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
#define CHECK_GT(x, y)
Definition: Logger.h:305
bool no_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
std::string to_string(char const *&&v)
void iterativeFileScan(ChunkMetadataVector &chunk_metadata_vector, IterativeFileScanParameters &file_scan_param)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
rapidjson::Document read_from_file(const std::string &file_path)
Definition: JsonUtils.cpp:201
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
std::vector< FileRegion > FileRegions
Definition: FileRegions.h:60
MetadataScanMultiThreadingParams multi_threading_params_
bool key_does_not_shard_to_leaf(const ChunkKey &key)
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:343
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
void throw_fragment_id_out_of_bounds_error(const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
void update_delete_buffer(const ParseBufferRequest &request, const ParseBufferResult &result, const foreign_storage::IterativeFileScanParameters &file_scan_param, const size_t start_position_in_fragment)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void populate_chunks(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser, foreign_storage::IterativeFileScanParameters &file_scan_param)
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
virtual std::optional< size_t > getMaxFileCount() const
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void initialize_non_append_mode_scan(const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
std::string write_to_string(const rapidjson::Document &document)
Definition: JsonUtils.cpp:225
bool is_dict_encoded_type() const
Definition: sqltypes.h:653
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
void dispatch_all_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
specifies the content in-memory of a row in the column metadata table
std::list< const ColumnDescriptor * > getColumns() const
bool g_enable_smem_group_by true
void dispatch_scan_requests_with_exception_handling(const foreign_storage::ForeignTable *table, const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser, const foreign_storage::IterativeFileScanParameters *file_scan_param, foreign_storage::AbstractTextFileDataWrapper::ResidualBuffer &iterative_residual_buffer, const bool is_first_file_scan_call, int &iterative_scan_last_fragment_id)
bool isAppendMode() const
Checks if the table is in append mode.
std::mutex & getChunkMutex(const int col_id) const
bool request_pool_non_empty(MetadataScanMultiThreadingParams &multi_threading_params)
void defer_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHECK_LE(x, y)
Definition: Logger.h:304
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: JsonUtils.h:255
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
const ForeignServer * foreign_server
Definition: ForeignTable.h:57
virtual size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
bool g_enable_watchdog false
Definition: Execute.cpp:80
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const ParseBufferResult &result, const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::map< std::string, std::string, std::less<>> OptionsMap
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:635
std::optional< ParseBufferRequest > get_next_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
bool is_file_scan_finished(const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
void throw_unexpected_number_of_items(const size_t &num_expected, const size_t &num_loaded, const std::string &item_type)
int8_t * numbersPtr
Definition: sqltypes.h:233
virtual std::set< std::string > checkForRolledOffFiles(const shared::FilePathOptions &file_path_options)
Definition: FileReader.cpp:615
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void dispatch_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool disable_cache)
bool is_array() const
Definition: sqltypes.h:583
virtual std::string getCurrentFilePath() const =0