OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AbstractTextFileDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 #include <queue>
23 
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
26 
30 #include "FsiJsonUtils.h"
31 #include "Shared/misc.h"
32 
33 namespace foreign_storage {
35  : db_id_(-1), foreign_table_(nullptr) {}
36 
38  const int db_id,
39  const ForeignTable* foreign_table)
40  : db_id_(db_id), foreign_table_(foreign_table), is_restored_(false) {}
41 
42 namespace {
43 std::set<const ColumnDescriptor*> get_columns(const ChunkToBufferMap& buffers,
44  const Catalog_Namespace::Catalog& catalog,
45  const int32_t table_id,
46  const int fragment_id) {
47  CHECK(!buffers.empty());
48  std::set<const ColumnDescriptor*> columns;
49  for (const auto& entry : buffers) {
50  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
51  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
52  const auto column = catalog.getMetadataForColumn(table_id, column_id);
53  columns.emplace(column);
54  }
55  return columns;
56 }
57 
58 bool skip_metadata_scan(const ColumnDescriptor* column) {
59  return column->columnType.is_dict_encoded_type();
60 }
61 } // namespace
62 
64  const std::set<const ColumnDescriptor*>& columns,
65  const int fragment_id,
66  const ChunkToBufferMap& buffers,
67  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
68  for (const auto column : columns) {
69  ChunkKey data_chunk_key = {
70  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
71  init_chunk_for_column(data_chunk_key,
73  buffers,
74  column_id_to_chunk_map[column->columnId]);
75  }
76 }
77 
79  const ChunkToBufferMap& required_buffers,
80  const ChunkToBufferMap& optional_buffers) {
81  auto timer = DEBUG_TIMER(__func__);
83  CHECK(catalog);
84  CHECK(!required_buffers.empty());
85 
86  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
87  auto required_columns =
88  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
89  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
91  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
92 
93  if (!optional_buffers.empty()) {
94  auto optional_columns =
95  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
97  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
98  }
99  populateChunks(column_id_to_chunk_map, fragment_id);
100  updateMetadata(column_id_to_chunk_map, fragment_id);
101  for (auto& entry : column_id_to_chunk_map) {
102  entry.second.setBuffer(nullptr);
103  entry.second.setIndexBuffer(nullptr);
104  }
105 }
106 
107 // if column was skipped during scan, update metadata now
109  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
110  int fragment_id) {
111  auto fragmenter = foreign_table_->fragmenter;
112  if (fragmenter) {
114  CHECK(catalog);
115  for (auto& entry : column_id_to_chunk_map) {
116  const auto& column =
117  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
118  if (skip_metadata_scan(column)) {
119  ChunkKey data_chunk_key = {
120  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
121  if (column->columnType.is_varlen_indeed()) {
122  data_chunk_key.emplace_back(1);
123  }
124  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
125  // Allocate new shared_ptr for metadata so we dont modify old one which may be
126  // used by executor
127  auto cached_metadata_previous =
128  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
129  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
130  std::make_shared<ChunkMetadata>();
131  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
132  *cached_metadata = *cached_metadata_previous;
133  auto chunk_metadata =
134  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
135  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
136  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
137  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
138  cached_metadata->numBytes = entry.second.getBuffer()->size();
139  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
140  }
141  }
142  }
143 }
144 
149  size_t file_offset;
150  size_t row_count;
151  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
152 
153  bool operator<(const ParseFileRegionResult& other) const {
154  return file_offset < other.file_offset;
155  }
156 };
157 
163  const FileRegions& file_regions,
164  const size_t start_index,
165  const size_t end_index,
166  FileReader& file_reader,
167  ParseBufferRequest& parse_file_request,
168  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
169  const TextFileBufferParser& parser) {
170  ParseFileRegionResult load_file_region_result{};
171  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
172  load_file_region_result.row_count = 0;
173 
175  for (size_t i = start_index; i <= end_index; i++) {
176  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
177  auto read_size = file_reader.readRegion(parse_file_request.buffer.get(),
178  file_regions[i].first_row_file_offset,
179  file_regions[i].region_size);
180  CHECK_EQ(file_regions[i].region_size, read_size);
181  parse_file_request.begin_pos = 0;
182  parse_file_request.end_pos = file_regions[i].region_size;
183  parse_file_request.first_row_index = file_regions[i].first_row_index;
184  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
185  parse_file_request.process_row_count = file_regions[i].row_count;
186 
187  result = parser.parseBuffer(parse_file_request, i == end_index);
188  CHECK_EQ(file_regions[i].row_count, result.row_count);
189  load_file_region_result.row_count += result.row_count;
190  }
191  load_file_region_result.column_id_to_data_blocks_map =
193  return load_file_region_result;
194 }
195 
199 size_t get_buffer_size(const import_export::CopyParams& copy_params,
200  const bool size_known,
201  const size_t file_size) {
202  size_t buffer_size = copy_params.buffer_size;
203  if (size_known && file_size < buffer_size) {
204  buffer_size = file_size + 1; // +1 for end of line character, if missing
205  }
206  return buffer_size;
207 }
208 
209 size_t get_buffer_size(const FileRegions& file_regions) {
210  size_t buffer_size = 0;
211  for (const auto& file_region : file_regions) {
212  buffer_size = std::max(buffer_size, file_region.region_size);
213  }
214  CHECK(buffer_size);
215  return buffer_size;
216 }
217 
222 size_t get_thread_count(const import_export::CopyParams& copy_params,
223  const bool size_known,
224  const size_t file_size,
225  const size_t buffer_size) {
226  size_t thread_count = copy_params.threads;
227  if (thread_count == 0) {
228  thread_count = std::thread::hardware_concurrency();
229  }
230  if (size_known) {
231  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
232  if (num_buffers_in_file < thread_count) {
233  thread_count = num_buffers_in_file;
234  }
235  }
236  CHECK_GT(thread_count, static_cast<size_t>(0));
237  return thread_count;
238 }
239 
240 size_t get_thread_count(const import_export::CopyParams& copy_params,
241  const FileRegions& file_regions) {
242  size_t thread_count = copy_params.threads;
243  if (thread_count == 0) {
244  thread_count =
245  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
246  }
247  CHECK_GT(thread_count, static_cast<size_t>(0));
248  return thread_count;
249 }
250 
252  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
253  int fragment_id) {
254  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
255 
256  CHECK(!column_id_to_chunk_map.empty());
257  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
258  CHECK(!file_regions.empty());
259 
260  const auto buffer_size = get_buffer_size(file_regions);
261  const auto thread_count = get_thread_count(copy_params, file_regions);
262 
263  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
264 
265  std::vector<ParseBufferRequest> parse_file_requests{};
266  parse_file_requests.reserve(thread_count);
267  std::vector<std::future<ParseFileRegionResult>> futures{};
268  std::set<int> column_filter_set;
269  for (const auto& pair : column_id_to_chunk_map) {
270  column_filter_set.insert(pair.first);
271  }
272 
273  std::vector<std::unique_ptr<FileReader>> file_readers;
274  rapidjson::Value reader_metadata(rapidjson::kObjectType);
275  rapidjson::Document d;
276  auto& server_options = foreign_table_->foreign_server->options;
277  file_reader_->serialize(reader_metadata, d.GetAllocator());
278  const auto file_path = getFullFilePath(foreign_table_);
279  auto& parser = getFileBufferParser();
280 
281  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
282  parse_file_requests.emplace_back(
283  buffer_size, copy_params, db_id_, foreign_table_, column_filter_set, file_path);
284  auto start_index = i;
285  auto end_index =
286  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
287 
288  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
289  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
290  file_path, copy_params, reader_metadata));
291  } else {
292  UNREACHABLE();
293  }
294 
295  futures.emplace_back(std::async(std::launch::async,
297  std::ref(file_regions),
298  start_index,
299  end_index,
300  std::ref(*(file_readers.back())),
301  std::ref(parse_file_requests.back()),
302  std::ref(column_id_to_chunk_map),
303  std::ref(parser)));
304  }
305 
306  for (auto& future : futures) {
307  future.wait();
308  }
309 
310  std::vector<ParseFileRegionResult> load_file_region_results{};
311  for (auto& future : futures) {
312  load_file_region_results.emplace_back(future.get());
313  }
314 
315  for (auto result : load_file_region_results) {
316  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
317  chunk.appendData(
318  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
319  }
320  }
321 }
322 
329 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
330  const size_t max_fragment_size,
331  const size_t buffer_row_count) {
332  CHECK(buffer_row_count > 0);
333  std::vector<size_t> partitions{};
334  size_t remaining_rows_in_last_fragment;
335  if (start_row_index % max_fragment_size == 0) {
336  remaining_rows_in_last_fragment = 0;
337  } else {
338  remaining_rows_in_last_fragment =
339  max_fragment_size - (start_row_index % max_fragment_size);
340  }
341  if (buffer_row_count <= remaining_rows_in_last_fragment) {
342  partitions.emplace_back(buffer_row_count);
343  } else {
344  if (remaining_rows_in_last_fragment > 0) {
345  partitions.emplace_back(remaining_rows_in_last_fragment);
346  }
347  size_t remaining_buffer_row_count =
348  buffer_row_count - remaining_rows_in_last_fragment;
349  while (remaining_buffer_row_count > 0) {
350  partitions.emplace_back(
351  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
352  remaining_buffer_row_count -= partitions.back();
353  }
354  }
355  return partitions;
356 }
357 
364  std::queue<ParseBufferRequest> pending_requests;
366  std::condition_variable pending_requests_condition;
367  std::queue<ParseBufferRequest> request_pool;
368  std::mutex request_pool_mutex;
369  std::condition_variable request_pool_condition;
371  std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer>> chunk_encoder_buffers;
372  std::map<ChunkKey, Chunk_NS::Chunk> cached_chunks;
374 };
375 
380 std::optional<ParseBufferRequest> get_next_metadata_scan_request(
381  MetadataScanMultiThreadingParams& multi_threading_params) {
382  std::unique_lock<std::mutex> pending_requests_lock(
383  multi_threading_params.pending_requests_mutex);
384  multi_threading_params.pending_requests_condition.wait(
385  pending_requests_lock, [&multi_threading_params] {
386  return !multi_threading_params.pending_requests.empty() ||
387  !multi_threading_params.continue_processing;
388  });
389  if (multi_threading_params.pending_requests.empty()) {
390  return {};
391  }
392  auto request = std::move(multi_threading_params.pending_requests.front());
393  multi_threading_params.pending_requests.pop();
394  pending_requests_lock.unlock();
395  multi_threading_params.pending_requests_condition.notify_all();
396  return std::move(request);
397 }
398 
403 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
404  int fragment_id,
405  size_t first_row_index,
406  const ParseBufferResult& result,
407  const std::string& file_path) {
408  fragment_id_to_file_regions_map[fragment_id].emplace_back(
409  // file naming is handled by FileReader
410  FileRegion(result.row_offsets.front(),
411  first_row_index,
412  result.row_count,
413  result.row_offsets.back() - result.row_offsets.front()));
414 }
415 
420 void update_stats(Encoder* encoder,
421  const SQLTypeInfo& column_type,
422  DataBlockPtr data_block,
423  const size_t row_count) {
424  if (column_type.is_array()) {
425  encoder->updateStats(data_block.arraysPtr, 0, row_count);
426  } else if (!column_type.is_varlen()) {
427  encoder->updateStats(data_block.numbersPtr, row_count);
428  } else {
429  encoder->updateStats(data_block.stringsPtr, 0, row_count);
430  }
431 }
432 namespace {
434  std::shared_ptr<Catalog_Namespace::Catalog>& catalog) {
435  if (catalog->getDataMgr()
436  .getPersistentStorageMgr()
437  ->getDiskCacheConfig()
438  .isEnabledForFSI()) {
439  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
440  } else {
441  return nullptr;
442  }
443 }
444 } // namespace
445 
446 // If cache is enabled, populate cached_chunks buffers with data blocks
447 void cache_blocks(std::map<ChunkKey, Chunk_NS::Chunk>& cached_chunks,
448  DataBlockPtr data_block,
449  size_t row_count,
450  ChunkKey& chunk_key,
451  const ColumnDescriptor* column,
452  bool is_first_block,
453  bool is_last_block) {
454  auto catalog =
456  CHECK(catalog);
457  auto cache = get_cache_if_enabled(catalog);
458  if (cache) {
459  ChunkKey index_key = {chunk_key[CHUNK_KEY_DB_IDX],
460  chunk_key[CHUNK_KEY_TABLE_IDX],
461  chunk_key[CHUNK_KEY_COLUMN_IDX],
462  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
463  2};
464  // Create actual data chunks to prepopulate cache
465  if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
466  cached_chunks[chunk_key] = Chunk_NS::Chunk{column};
467  cached_chunks[chunk_key].setBuffer(
468  cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
469  if (column->columnType.is_varlen_indeed()) {
470  cached_chunks[chunk_key].setIndexBuffer(
471  cache->getChunkBufferForPrecaching(index_key, is_first_block));
472  }
473  if (is_first_block) {
474  cached_chunks[chunk_key].initEncoder();
475  }
476  }
477  cached_chunks[chunk_key].appendData(data_block, row_count, 0);
478  }
479 }
480 
487  int fragment_id,
488  const ParseBufferRequest& request,
490  std::map<int, const ColumnDescriptor*>& column_by_id,
491  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
492  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
493  // File regions should be added in same order as appendData
494  add_file_region(fragment_id_to_file_regions_map,
495  fragment_id,
496  request.first_row_index,
497  result,
498  request.getFilePath());
499 
500  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
501  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
502  const auto column = column_by_id[column_id];
503  if (column->columnType.is_varlen_indeed()) {
504  chunk_key.emplace_back(1);
505  }
506  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
507  multi_threading_params.chunk_encoder_buffers.end()) {
508  multi_threading_params.chunk_encoder_buffers[chunk_key] =
509  std::make_unique<ForeignStorageBuffer>();
510  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
511  column->columnType);
512  }
513  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder(),
514  column->columnType,
515  data_block,
516  result.row_count);
517  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
518  ->getEncoder()
519  ->getNumElems() +
520  result.row_count;
521  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
522  num_elements);
523  cache_blocks(
524  multi_threading_params.cached_chunks,
525  data_block,
526  result.row_count,
527  chunk_key,
528  column,
529  (num_elements - result.row_count) == 0, // Is the first block added to this chunk
530  num_elements == request.getMaxFragRows() // Is the last block for this chunk
531  );
532  }
533 }
534 
540  ParseBufferRequest& request) {
541  std::unique_lock<std::mutex> completed_requests_queue_lock(
542  multi_threading_params.request_pool_mutex);
543  multi_threading_params.request_pool.emplace(std::move(request));
544  completed_requests_queue_lock.unlock();
545  multi_threading_params.request_pool_condition.notify_all();
546 }
547 
552 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
553  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
554  const TextFileBufferParser& parser) {
555  std::map<int, const ColumnDescriptor*> column_by_id{};
556  while (true) {
557  auto request_opt = get_next_metadata_scan_request(multi_threading_params);
558  if (!request_opt.has_value()) {
559  break;
560  }
561  auto& request = request_opt.value();
562  try {
563  if (column_by_id.empty()) {
564  for (const auto column : request.getColumns()) {
565  column_by_id[column->columnId] = column;
566  }
567  }
568  auto partitions = partition_by_fragment(
569  request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
570  request.begin_pos = 0;
571  size_t row_index = request.first_row_index;
572  for (const auto partition : partitions) {
573  request.process_row_count = partition;
574  for (const auto& import_buffer : request.import_buffers) {
575  if (import_buffer != nullptr) {
576  import_buffer->clear();
577  }
578  }
579  auto result = parser.parseBuffer(request, true);
580  int fragment_id = row_index / request.getMaxFragRows();
581  process_data_blocks(multi_threading_params,
582  fragment_id,
583  request,
584  result,
585  column_by_id,
586  fragment_id_to_file_regions_map);
587  row_index += result.row_count;
588  request.begin_pos = result.row_offsets.back() - request.file_offset;
589  }
590  } catch (...) {
591  // Re-add request to pool so we dont block any other threads
592  {
593  std::lock_guard<std::mutex> pending_requests_lock(
594  multi_threading_params.pending_requests_mutex);
595  multi_threading_params.continue_processing = false;
596  }
597  add_request_to_pool(multi_threading_params, request);
598  throw;
599  }
600  add_request_to_pool(multi_threading_params, request);
601  }
602 }
603 
608  MetadataScanMultiThreadingParams& multi_threading_params) {
609  std::unique_lock<std::mutex> request_pool_lock(
610  multi_threading_params.request_pool_mutex);
611  multi_threading_params.request_pool_condition.wait(
612  request_pool_lock,
613  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
614  auto request = std::move(multi_threading_params.request_pool.front());
615  multi_threading_params.request_pool.pop();
616  request_pool_lock.unlock();
617  CHECK(request.buffer);
618  return request;
619 }
620 
626  MetadataScanMultiThreadingParams& multi_threading_params,
627  ParseBufferRequest& request) {
628  {
629  std::unique_lock<std::mutex> pending_requests_lock(
630  multi_threading_params.pending_requests_mutex);
631  multi_threading_params.pending_requests.emplace(std::move(request));
632  }
633  multi_threading_params.pending_requests_condition.notify_all();
634 }
635 
640 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
641  size_t& buffer_size,
642  const size_t alloc_size) {
643  CHECK_LE(buffer_size, alloc_size);
644  if (buffer_size < alloc_size) {
645  buffer = std::make_unique<char[]>(alloc_size);
646  buffer_size = alloc_size;
647  }
648 }
649 
655  const size_t& buffer_size,
656  const std::string& file_path,
657  FileReader& file_reader,
658  const import_export::CopyParams& copy_params,
659  MetadataScanMultiThreadingParams& multi_threading_params,
660  size_t& first_row_index_in_buffer,
661  size_t& current_file_offset,
662  const TextFileBufferParser& parser) {
663  auto alloc_size = buffer_size;
664  auto residual_buffer = std::make_unique<char[]>(alloc_size);
665  size_t residual_buffer_size = 0;
666  size_t residual_buffer_alloc_size = alloc_size;
667 
668  while (!file_reader.isScanFinished()) {
669  {
670  std::lock_guard<std::mutex> pending_requests_lock(
671  multi_threading_params.pending_requests_mutex);
672  if (!multi_threading_params.continue_processing) {
673  break;
674  }
675  }
676  auto request = get_request_from_pool(multi_threading_params);
677  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
678 
679  if (residual_buffer_size > 0) {
680  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
681  }
682  size_t size = residual_buffer_size;
683  size += file_reader.read(request.buffer.get() + residual_buffer_size,
684  alloc_size - residual_buffer_size);
685 
686  if (size == 0) {
687  // In some cases at the end of a file we will read 0 bytes even when
688  // file_reader.isScanFinished() is false
689  continue;
690  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
691  // In some cases files with newlines at the end will be encoded with a second
692  // newline that can end up being the only thing in the buffer
693  current_file_offset++;
694  continue;
695  }
696  unsigned int num_rows_in_buffer = 0;
697  request.end_pos = parser.findRowEndPosition(alloc_size,
698  request.buffer,
699  size,
700  copy_params,
701  first_row_index_in_buffer,
702  num_rows_in_buffer,
703  &file_reader);
704  request.buffer_size = size;
705  request.buffer_alloc_size = alloc_size;
706  request.first_row_index = first_row_index_in_buffer;
707  request.file_offset = current_file_offset;
708  request.buffer_row_count = num_rows_in_buffer;
709 
710  residual_buffer_size = size - request.end_pos;
711  if (residual_buffer_size > 0) {
712  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
713  memcpy(residual_buffer.get(),
714  request.buffer.get() + request.end_pos,
715  residual_buffer_size);
716  }
717 
718  current_file_offset += request.end_pos;
719  first_row_index_in_buffer += num_rows_in_buffer;
720 
721  dispatch_metadata_scan_request(multi_threading_params, request);
722  }
723 
724  std::unique_lock<std::mutex> pending_requests_queue_lock(
725  multi_threading_params.pending_requests_mutex);
726  multi_threading_params.pending_requests_condition.wait(
727  pending_requests_queue_lock, [&multi_threading_params] {
728  return multi_threading_params.pending_requests.empty() ||
729  (multi_threading_params.continue_processing == false);
730  });
731  multi_threading_params.continue_processing = false;
732  pending_requests_queue_lock.unlock();
733  multi_threading_params.pending_requests_condition.notify_all();
734 }
735 
736 namespace {
737 // Create metadata for unscanned columns
738 // Any fragments with any updated rows between start_row and num_rows will be updated
739 // Chunks prior to start_row will be restored from (ie for append
740 // workflows)
742  const ColumnDescriptor* column,
743  const ForeignTable* foreign_table,
744  const int db_id,
745  const size_t start_row,
746  const size_t total_num_rows,
747  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
748  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
749  if (column->columnType.is_varlen_indeed()) {
750  chunk_key.emplace_back(1);
751  }
752 
753  // Create placeholder metadata for every fragment touched by this scan
754  int start_fragment = start_row / foreign_table->maxFragRows;
755  int end_fragment{0};
756  if (total_num_rows > 0) {
757  end_fragment = (total_num_rows - 1) / foreign_table->maxFragRows;
758  }
759  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
760  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
761  (fragment_id + 1)) > total_num_rows)
762  ? total_num_rows % foreign_table->maxFragRows
763  : foreign_table->maxFragRows;
764 
765  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
766  chunk_metadata_map[chunk_key] = get_placeholder_metadata(column, num_elements);
767  }
768 }
769 
770 } // namespace
771 
785  ChunkMetadataVector& chunk_metadata_vector) {
786  auto timer = DEBUG_TIMER(__func__);
787 
788  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
789  const auto file_path = getFullFilePath(foreign_table_);
791  CHECK(catalog);
792  auto& parser = getFileBufferParser();
793  auto& server_options = foreign_table_->foreign_server->options;
794  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
795  parser.validateFiles(file_reader_.get(), foreign_table_);
796  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
797  file_reader_->checkForMoreRows(append_start_offset_);
798  } else {
799  UNREACHABLE();
800  }
801  } else {
802  // Should only be called once for non-append tables
803  CHECK(chunk_metadata_map_.empty());
805  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
806  file_reader_ = std::make_unique<LocalMultiFileReader>(
807  file_path,
808  copy_params,
814  } else {
815  UNREACHABLE();
816  }
817  parser.validateFiles(file_reader_.get(), foreign_table_);
818  num_rows_ = 0;
820  }
821 
822  auto columns =
823  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
824  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
825  for (auto column : columns) {
826  column_by_id[column->columnId] = column;
827  }
828  MetadataScanMultiThreadingParams multi_threading_params;
829 
830  // Restore previous chunk data
831  if (foreign_table_->isAppendMode()) {
832  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
833  }
834 
835  std::set<int> columns_to_scan;
836  for (auto column : columns) {
837  if (!skip_metadata_scan(column)) {
838  columns_to_scan.insert(column->columnId);
839  }
840  }
841 
842  // Track where scan started for appends
843  int start_row = num_rows_;
844  if (!file_reader_->isScanFinished()) {
845  auto buffer_size = get_buffer_size(copy_params,
846  file_reader_->isRemainingSizeKnown(),
847  file_reader_->getRemainingSize());
848  auto thread_count = get_thread_count(copy_params,
849  file_reader_->isRemainingSizeKnown(),
850  file_reader_->getRemainingSize(),
851  buffer_size);
852  multi_threading_params.continue_processing = true;
853 
854  std::vector<std::future<void>> futures{};
855  for (size_t i = 0; i < thread_count; i++) {
856  multi_threading_params.request_pool.emplace(buffer_size,
857  copy_params,
858  db_id_,
860  columns_to_scan,
862 
863  futures.emplace_back(std::async(std::launch::async,
865  std::ref(multi_threading_params),
867  std::ref(parser)));
868  }
869 
870  try {
872  file_path,
873  (*file_reader_),
874  copy_params,
875  multi_threading_params,
876  num_rows_,
879  } catch (...) {
880  {
881  std::unique_lock<std::mutex> pending_requests_lock(
882  multi_threading_params.pending_requests_mutex);
883  multi_threading_params.continue_processing = false;
884  }
885  multi_threading_params.pending_requests_condition.notify_all();
886  throw;
887  }
888 
889  for (auto& future : futures) {
890  // get() instead of wait() because we need to propagate potential exceptions.
891  future.get();
892  }
893  }
894 
895  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
896  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
897  CHECK(column_entry != column_by_id.end());
898  const auto& column_type = column_entry->second->columnType;
899  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
900  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
901  const auto& cached_chunks = multi_threading_params.cached_chunks;
902  if (!column_type.is_varlen_indeed()) {
903  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
904  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
905  chunk_entry != cached_chunks.end()) {
906  auto buffer = chunk_entry->second.getBuffer();
907  CHECK(buffer);
908  chunk_metadata->numBytes = buffer->size();
909  } else {
910  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
911  }
912  chunk_metadata_map_[chunk_key] = chunk_metadata;
913  }
914 
915  for (auto column : columns) {
916  if (skip_metadata_scan(column)) {
918  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
919  }
920  }
921 
922  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
923  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
924  }
925 
926  // Save chunk data
927  if (foreign_table_->isAppendMode()) {
928  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
929  }
930 }
931 
933  rapidjson::Document d;
934  d.SetObject();
935 
936  // Save fragment map
939  "fragment_id_to_file_regions_map",
940  d.GetAllocator());
941 
942  // Save reader metadata
943  rapidjson::Value reader_metadata(rapidjson::kObjectType);
944  file_reader_->serialize(reader_metadata, d.GetAllocator());
945  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
946 
947  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
949  d, append_start_offset_, "append_start_offset", d.GetAllocator());
950 
951  return json_utils::write_to_string(d);
952 }
953 
955  const std::string& file_path,
956  const ChunkMetadataVector& chunk_metadata) {
957  auto d = json_utils::read_from_file(file_path);
958  CHECK(d.IsObject());
959 
960  // Restore fragment map
962  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
963 
964  // Construct reader with metadta
965  CHECK(d.HasMember("reader_metadata"));
966  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
967  const auto full_file_path = getFullFilePath(foreign_table_);
968  auto& server_options = foreign_table_->foreign_server->options;
969  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
970  file_reader_ = std::make_unique<LocalMultiFileReader>(
971  full_file_path, copy_params, d["reader_metadata"]);
972  } else {
973  UNREACHABLE();
974  }
975 
977  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
978 
979  // Now restore the internal metadata maps
980  CHECK(chunk_metadata_map_.empty());
981  CHECK(chunk_encoder_buffers_.empty());
982 
983  for (auto& pair : chunk_metadata) {
984  chunk_metadata_map_[pair.first] = pair.second;
985 
986  if (foreign_table_->isAppendMode()) {
987  // Restore encoder state for append mode
988  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
989  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
990  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
991  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
992  pair.second->numElements);
993  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
994  pair.second->chunkStats);
995  chunk_encoder_buffers_[pair.first]->setUpdated();
996  }
997  }
998  is_restored_ = true;
999 }
1000 
1002  return is_restored_;
1003 }
1004 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int > ChunkKey
Definition: types.h:37
virtual size_t read(void *buffer, size_t max_size)=0
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog)
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool is_last_block)
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool is_varlen() const
Definition: sqltypes.h:534
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
#define UNREACHABLE()
Definition: Logger.h:253
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
virtual const TextFileBufferParser & getFileBufferParser() const =0
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::optional< ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
std::vector< FileRegion > FileRegions
Definition: FileRegions.h:67
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:325
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
rapidjson::Document read_from_file(const std::string &file_path)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
bool is_dict_encoded_type() const
Definition: sqltypes.h:554
bool operator<(const ParseFileRegionResult &other) const
specifies the content in-memory of a row in the column metadata table
virtual bool isScanFinished()=0
std::optional< std::string > getOption(const std::string_view &key) const
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHECK_LE(x, y)
Definition: Logger.h:220
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
V & get_from_map(std::map< K, V > &map, const K &key)
Definition: misc.h:58
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
const ForeignServer * foreign_server
Definition: ForeignTable.h:54
virtual size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
bool g_enable_watchdog false
Definition: Execute.cpp:76
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const ParseBufferResult &result, const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
std::string write_to_string(const rapidjson::Document &document)
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
virtual ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const =0
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:540
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
int8_t * numbersPtr
Definition: sqltypes.h:226
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
bool is_array() const
Definition: sqltypes.h:517
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31