OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AbstractTextFileDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 #include <queue>
23 
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
26 
31 #include "FsiJsonUtils.h"
33 #include "Shared/misc.h"
34 
35 namespace foreign_storage {
37  : db_id_(-1)
38  , foreign_table_(nullptr)
39  , user_mapping_(nullptr)
40  , disable_cache_(false) {}
41 
43  const int db_id,
44  const ForeignTable* foreign_table)
45  : db_id_(db_id)
46  , foreign_table_(foreign_table)
47  , is_restored_(false)
48  , user_mapping_(nullptr)
49  , disable_cache_(false) {}
50 
52  const int db_id,
53  const ForeignTable* foreign_table,
54  const UserMapping* user_mapping,
55  const bool disable_cache)
56  : db_id_(db_id)
57  , foreign_table_(foreign_table)
58  , is_restored_(false)
59  , user_mapping_(user_mapping)
60  , disable_cache_(disable_cache) {}
61 
62 namespace {
63 std::set<const ColumnDescriptor*> get_columns(const ChunkToBufferMap& buffers,
64  const Catalog_Namespace::Catalog& catalog,
65  const int32_t table_id,
66  const int fragment_id) {
67  CHECK(!buffers.empty());
68  std::set<const ColumnDescriptor*> columns;
69  for (const auto& entry : buffers) {
70  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
71  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
72  const auto column = catalog.getMetadataForColumn(table_id, column_id);
73  columns.emplace(column);
74  }
75  return columns;
76 }
77 
78 bool skip_metadata_scan(const ColumnDescriptor* column) {
79  return column->columnType.is_dict_encoded_type();
80 }
81 } // namespace
82 
84  const std::set<const ColumnDescriptor*>& columns,
85  const int fragment_id,
86  const ChunkToBufferMap& buffers,
87  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
88  for (const auto column : columns) {
89  ChunkKey data_chunk_key = {
90  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
91  init_chunk_for_column(data_chunk_key,
93  buffers,
94  column_id_to_chunk_map[column->columnId]);
95  }
96 }
97 
99  const ChunkToBufferMap& required_buffers,
100  const ChunkToBufferMap& optional_buffers,
101  AbstractBuffer* delete_buffer) {
102  auto timer = DEBUG_TIMER(__func__);
104  CHECK(catalog);
105  CHECK(!required_buffers.empty());
106 
107  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
108  auto required_columns =
109  get_columns(required_buffers, *catalog, foreign_table_->tableId, fragment_id);
110  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
112  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
113 
114  if (!optional_buffers.empty()) {
115  auto optional_columns =
116  get_columns(optional_buffers, *catalog, foreign_table_->tableId, fragment_id);
118  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
119  }
120  populateChunks(column_id_to_chunk_map, fragment_id, delete_buffer);
121  updateMetadata(column_id_to_chunk_map, fragment_id);
122 }
123 
124 // if column was skipped during scan, update metadata now
126  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
127  int fragment_id) {
129  CHECK(catalog);
130  for (auto& entry : column_id_to_chunk_map) {
131  const auto& column =
132  catalog->getMetadataForColumn(foreign_table_->tableId, entry.first);
133  if (skip_metadata_scan(column)) {
134  ChunkKey data_chunk_key = {
135  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
136  if (column->columnType.is_varlen_indeed()) {
137  data_chunk_key.emplace_back(1);
138  }
139  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
140  // Allocate new shared_ptr for metadata so we dont modify old one which may be
141  // used by executor
142  auto cached_metadata_previous =
143  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
144  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
145  std::make_shared<ChunkMetadata>();
146  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
147  *cached_metadata = *cached_metadata_previous;
148  auto chunk_metadata =
149  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
150  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
151  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
152  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
153  cached_metadata->numBytes = entry.second.getBuffer()->size();
154  }
155  }
156 }
157 
162  size_t file_offset;
163  size_t row_count;
164  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
165  std::set<size_t> rejected_row_indices;
166 
167  bool operator<(const ParseFileRegionResult& other) const {
168  return file_offset < other.file_offset;
169  }
170 };
171 
177  const FileRegions& file_regions,
178  const size_t start_index,
179  const size_t end_index,
180  FileReader& file_reader,
181  ParseBufferRequest& parse_file_request,
182  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
183  const TextFileBufferParser& parser) {
184  auto timer = DEBUG_TIMER(__func__);
185  ParseFileRegionResult load_file_region_result{};
186  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
187  load_file_region_result.row_count = 0;
188 
190  for (size_t i = start_index; i <= end_index; i++) {
191  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
192  auto read_size = file_reader.readRegion(parse_file_request.buffer.get(),
193  file_regions[i].first_row_file_offset,
194  file_regions[i].region_size);
195  CHECK_EQ(file_regions[i].region_size, read_size);
196  parse_file_request.begin_pos = 0;
197  parse_file_request.end_pos = file_regions[i].region_size;
198  parse_file_request.first_row_index = file_regions[i].first_row_index;
199  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
200  parse_file_request.process_row_count = file_regions[i].row_count;
201 
202  result = parser.parseBuffer(parse_file_request, i == end_index);
203  CHECK_EQ(file_regions[i].row_count, result.row_count);
204  for (const auto& rejected_row_index : result.rejected_rows) {
205  load_file_region_result.rejected_row_indices.insert(
206  load_file_region_result.row_count + rejected_row_index);
207  }
208  load_file_region_result.row_count += result.row_count;
209  }
210  load_file_region_result.column_id_to_data_blocks_map =
212  return load_file_region_result;
213 }
214 
218 size_t get_buffer_size(const import_export::CopyParams& copy_params,
219  const bool size_known,
220  const size_t file_size) {
221  size_t buffer_size = copy_params.buffer_size;
222  if (size_known && file_size < buffer_size) {
223  buffer_size = file_size + 1; // +1 for end of line character, if missing
224  }
225  return buffer_size;
226 }
227 
228 size_t get_buffer_size(const FileRegions& file_regions) {
229  size_t buffer_size = 0;
230  for (const auto& file_region : file_regions) {
231  buffer_size = std::max(buffer_size, file_region.region_size);
232  }
233  CHECK(buffer_size);
234  return buffer_size;
235 }
236 
241 size_t get_thread_count(const import_export::CopyParams& copy_params,
242  const bool size_known,
243  const size_t file_size,
244  const size_t buffer_size) {
245  size_t thread_count = copy_params.threads;
246  if (thread_count == 0) {
247  thread_count = std::thread::hardware_concurrency();
248  }
249  if (size_known && file_size > 0) {
250  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
251  if (num_buffers_in_file < thread_count) {
252  thread_count = num_buffers_in_file;
253  }
254  }
255  CHECK_GT(thread_count, static_cast<size_t>(0));
256  return thread_count;
257 }
258 
259 size_t get_thread_count(const import_export::CopyParams& copy_params,
260  const FileRegions& file_regions) {
261  size_t thread_count = copy_params.threads;
262  if (thread_count == 0) {
263  thread_count =
264  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
265  }
266  CHECK_GT(thread_count, static_cast<size_t>(0));
267  return thread_count;
268 }
269 
271  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
272  int fragment_id,
273  AbstractBuffer* delete_buffer) {
274  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
275 
276  CHECK(!column_id_to_chunk_map.empty());
277  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
278  CHECK(!file_regions.empty());
279 
280  const auto buffer_size = get_buffer_size(file_regions);
281  const auto thread_count = get_thread_count(copy_params, file_regions);
282 
283  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
284 
285  std::vector<ParseBufferRequest> parse_file_requests{};
286  parse_file_requests.reserve(thread_count);
287  std::vector<std::future<ParseFileRegionResult>> futures{};
288  std::set<int> column_filter_set;
289  for (const auto& pair : column_id_to_chunk_map) {
290  column_filter_set.insert(pair.first);
291  }
292 
293  std::vector<std::unique_ptr<FileReader>> file_readers;
294  rapidjson::Value reader_metadata(rapidjson::kObjectType);
295  rapidjson::Document d;
296  auto& server_options = foreign_table_->foreign_server->options;
297  file_reader_->serialize(reader_metadata, d.GetAllocator());
298  const auto file_path = getFullFilePath(foreign_table_);
299  auto& parser = getFileBufferParser();
300 
301  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
302  parse_file_requests.emplace_back(buffer_size,
303  copy_params,
304  db_id_,
306  column_filter_set,
307  file_path,
309  delete_buffer != nullptr);
310  auto start_index = i;
311  auto end_index =
312  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
313 
314  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
315  file_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
316  file_path, copy_params, reader_metadata));
317  } else {
318  UNREACHABLE();
319  }
320 
321  futures.emplace_back(std::async(std::launch::async,
323  std::ref(file_regions),
324  start_index,
325  end_index,
326  std::ref(*(file_readers.back())),
327  std::ref(parse_file_requests.back()),
328  std::ref(column_id_to_chunk_map),
329  std::ref(parser)));
330  }
331 
332  for (auto& future : futures) {
333  future.wait();
334  }
335 
336  std::vector<ParseFileRegionResult> load_file_region_results{};
337  for (auto& future : futures) {
338  load_file_region_results.emplace_back(future.get());
339  }
340 
341  std::set<size_t> chunk_rejected_row_indices;
342  size_t chunk_offset = 0;
343  for (auto result : load_file_region_results) {
344  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
345  chunk.appendData(
346  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
347  }
348  for (const auto& rejected_row_index : result.rejected_row_indices) {
349  chunk_rejected_row_indices.insert(rejected_row_index + chunk_offset);
350  }
351  chunk_offset += result.row_count;
352  }
353 
354  if (delete_buffer) {
355  auto chunk_element_count = chunk_offset;
356  delete_buffer->reserve(chunk_element_count);
357  for (size_t i = 0; i < chunk_element_count; ++i) {
358  if (chunk_rejected_row_indices.find(i) != chunk_rejected_row_indices.end()) {
359  int8_t true_byte = true;
360  delete_buffer->append(&true_byte, 1);
361  } else {
362  int8_t false_byte = false;
363  delete_buffer->append(&false_byte, 1);
364  }
365  }
366  }
367 }
368 
375 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
376  const size_t max_fragment_size,
377  const size_t buffer_row_count) {
378  CHECK(buffer_row_count > 0);
379  std::vector<size_t> partitions{};
380  size_t remaining_rows_in_last_fragment;
381  if (start_row_index % max_fragment_size == 0) {
382  remaining_rows_in_last_fragment = 0;
383  } else {
384  remaining_rows_in_last_fragment =
385  max_fragment_size - (start_row_index % max_fragment_size);
386  }
387  if (buffer_row_count <= remaining_rows_in_last_fragment) {
388  partitions.emplace_back(buffer_row_count);
389  } else {
390  if (remaining_rows_in_last_fragment > 0) {
391  partitions.emplace_back(remaining_rows_in_last_fragment);
392  }
393  size_t remaining_buffer_row_count =
394  buffer_row_count - remaining_rows_in_last_fragment;
395  while (remaining_buffer_row_count > 0) {
396  partitions.emplace_back(
397  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
398  remaining_buffer_row_count -= partitions.back();
399  }
400  }
401  return partitions;
402 }
403 
410  std::queue<ParseBufferRequest> pending_requests;
412  std::condition_variable pending_requests_condition;
413  std::queue<ParseBufferRequest> request_pool;
414  std::mutex request_pool_mutex;
415  std::condition_variable request_pool_condition;
417  std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer>> chunk_encoder_buffers;
418  std::map<ChunkKey, Chunk_NS::Chunk> cached_chunks;
421 };
422 
427 std::optional<ParseBufferRequest> get_next_metadata_scan_request(
428  MetadataScanMultiThreadingParams& multi_threading_params) {
429  std::unique_lock<std::mutex> pending_requests_lock(
430  multi_threading_params.pending_requests_mutex);
431  multi_threading_params.pending_requests_condition.wait(
432  pending_requests_lock, [&multi_threading_params] {
433  return !multi_threading_params.pending_requests.empty() ||
434  !multi_threading_params.continue_processing;
435  });
436  if (multi_threading_params.pending_requests.empty()) {
437  return {};
438  }
439  auto request = std::move(multi_threading_params.pending_requests.front());
440  multi_threading_params.pending_requests.pop();
441  pending_requests_lock.unlock();
442  multi_threading_params.pending_requests_condition.notify_all();
443  return std::move(request);
444 }
445 
450 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
451  int fragment_id,
452  size_t first_row_index,
453  const ParseBufferResult& result,
454  const std::string& file_path) {
455  fragment_id_to_file_regions_map[fragment_id].emplace_back(
456  // file naming is handled by FileReader
457  FileRegion(result.row_offsets.front(),
458  first_row_index,
459  result.row_count,
460  result.row_offsets.back() - result.row_offsets.front()));
461 }
462 
467 void update_stats(Encoder* encoder,
468  const SQLTypeInfo& column_type,
469  DataBlockPtr data_block,
470  const size_t row_count) {
471  if (column_type.is_array()) {
472  encoder->updateStats(data_block.arraysPtr, 0, row_count);
473  } else if (!column_type.is_varlen()) {
474  encoder->updateStats(data_block.numbersPtr, row_count);
475  } else {
476  encoder->updateStats(data_block.stringsPtr, 0, row_count);
477  }
478 }
479 namespace {
481  std::shared_ptr<Catalog_Namespace::Catalog>& catalog,
482  const bool disable_cache) {
483  if (!disable_cache && catalog->getDataMgr()
484  .getPersistentStorageMgr()
485  ->getDiskCacheConfig()
486  .isEnabledForFSI()) {
487  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
488  } else {
489  return nullptr;
490  }
491 }
492 } // namespace
493 
494 // If cache is enabled, populate cached_chunks buffers with data blocks
495 void cache_blocks(std::map<ChunkKey, Chunk_NS::Chunk>& cached_chunks,
496  DataBlockPtr data_block,
497  size_t row_count,
498  ChunkKey& chunk_key,
499  const ColumnDescriptor* column,
500  bool is_first_block,
501  bool is_last_block,
502  bool disable_cache) {
503  auto catalog =
505  CHECK(catalog);
506  auto cache = get_cache_if_enabled(catalog, disable_cache);
507  if (cache) {
508  // This extra filter needs to be here because this wrapper is the only one that
509  // accesses the cache directly and it should not be inserting chunks which are not
510  // mapped to the current leaf (in distributed mode).
511  if (key_does_not_shard_to_leaf(chunk_key)) {
512  return;
513  }
514 
515  ChunkKey index_key = {chunk_key[CHUNK_KEY_DB_IDX],
516  chunk_key[CHUNK_KEY_TABLE_IDX],
517  chunk_key[CHUNK_KEY_COLUMN_IDX],
518  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
519  2};
520  // Create actual data chunks to prepopulate cache
521  if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
522  cached_chunks[chunk_key] = Chunk_NS::Chunk{column, false};
523  cached_chunks[chunk_key].setBuffer(
524  cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
525  if (column->columnType.is_varlen_indeed()) {
526  cached_chunks[chunk_key].setIndexBuffer(
527  cache->getChunkBufferForPrecaching(index_key, is_first_block));
528  }
529  if (is_first_block) {
530  cached_chunks[chunk_key].initEncoder();
531  }
532  }
533  cached_chunks[chunk_key].appendData(data_block, row_count, 0);
534  }
535 }
536 
543  int fragment_id,
544  const ParseBufferRequest& request,
546  std::map<int, const ColumnDescriptor*>& column_by_id,
547  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
548  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
549  // File regions should be added in same order as appendData
550  add_file_region(fragment_id_to_file_regions_map,
551  fragment_id,
552  request.first_row_index,
553  result,
554  request.getFilePath());
555 
556  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
557  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
558  const auto column = column_by_id[column_id];
559  if (column->columnType.is_varlen_indeed()) {
560  chunk_key.emplace_back(1);
561  }
562  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
563  multi_threading_params.chunk_encoder_buffers.end()) {
564  multi_threading_params.chunk_encoder_buffers[chunk_key] =
565  std::make_unique<ForeignStorageBuffer>();
566  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
567  column->columnType);
568  }
569  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder(),
570  column->columnType,
571  data_block,
572  result.row_count);
573  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
574  ->getEncoder()
575  ->getNumElems() +
576  result.row_count;
577  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
578  num_elements);
579  cache_blocks(
580  multi_threading_params.cached_chunks,
581  data_block,
582  result.row_count,
583  chunk_key,
584  column,
585  (num_elements - result.row_count) == 0, // Is the first block added to this chunk
586  num_elements == request.getMaxFragRows(), // Is the last block for this chunk
587  multi_threading_params.disable_cache);
588  }
589 }
590 
596  ParseBufferRequest& request) {
597  std::unique_lock<std::mutex> completed_requests_queue_lock(
598  multi_threading_params.request_pool_mutex);
599  multi_threading_params.request_pool.emplace(std::move(request));
600  completed_requests_queue_lock.unlock();
601  multi_threading_params.request_pool_condition.notify_all();
602 }
603 
608 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
609  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
610  const TextFileBufferParser& parser) {
611  std::map<int, const ColumnDescriptor*> column_by_id{};
612  while (true) {
613  auto request_opt = get_next_metadata_scan_request(multi_threading_params);
614  if (!request_opt.has_value()) {
615  break;
616  }
617  auto& request = request_opt.value();
618  try {
619  if (column_by_id.empty()) {
620  for (const auto column : request.getColumns()) {
621  column_by_id[column->columnId] = column;
622  }
623  }
624  auto partitions = partition_by_fragment(
625  request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
626  request.begin_pos = 0;
627  size_t row_index = request.first_row_index;
628  for (const auto partition : partitions) {
629  request.process_row_count = partition;
630  for (const auto& import_buffer : request.import_buffers) {
631  if (import_buffer != nullptr) {
632  import_buffer->clear();
633  }
634  }
635  auto result = parser.parseBuffer(request, true);
636  int fragment_id = row_index / request.getMaxFragRows();
637  process_data_blocks(multi_threading_params,
638  fragment_id,
639  request,
640  result,
641  column_by_id,
642  fragment_id_to_file_regions_map);
643  row_index += result.row_count;
644  request.begin_pos = result.row_offsets.back() - request.file_offset;
645  }
646  } catch (...) {
647  // Re-add request to pool so we dont block any other threads
648  {
649  std::lock_guard<std::mutex> pending_requests_lock(
650  multi_threading_params.pending_requests_mutex);
651  multi_threading_params.continue_processing = false;
652  }
653  add_request_to_pool(multi_threading_params, request);
654  throw;
655  }
656  add_request_to_pool(multi_threading_params, request);
657  }
658 }
659 
664  MetadataScanMultiThreadingParams& multi_threading_params) {
665  std::unique_lock<std::mutex> request_pool_lock(
666  multi_threading_params.request_pool_mutex);
667  multi_threading_params.request_pool_condition.wait(
668  request_pool_lock,
669  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
670  auto request = std::move(multi_threading_params.request_pool.front());
671  multi_threading_params.request_pool.pop();
672  request_pool_lock.unlock();
673  CHECK(request.buffer);
674  return request;
675 }
676 
682  MetadataScanMultiThreadingParams& multi_threading_params,
683  ParseBufferRequest& request) {
684  {
685  std::unique_lock<std::mutex> pending_requests_lock(
686  multi_threading_params.pending_requests_mutex);
687  multi_threading_params.pending_requests.emplace(std::move(request));
688  }
689  multi_threading_params.pending_requests_condition.notify_all();
690 }
691 
696 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
697  size_t& buffer_size,
698  const size_t alloc_size) {
699  CHECK_LE(buffer_size, alloc_size);
700  if (buffer_size < alloc_size) {
701  buffer = std::make_unique<char[]>(alloc_size);
702  buffer_size = alloc_size;
703  }
704 }
705 
711  const size_t& buffer_size,
712  const std::string& file_path,
713  FileReader& file_reader,
714  const import_export::CopyParams& copy_params,
715  MetadataScanMultiThreadingParams& multi_threading_params,
716  size_t& first_row_index_in_buffer,
717  size_t& current_file_offset,
718  const TextFileBufferParser& parser) {
719  auto alloc_size = buffer_size;
720  auto residual_buffer = std::make_unique<char[]>(alloc_size);
721  size_t residual_buffer_size = 0;
722  size_t residual_buffer_alloc_size = alloc_size;
723 
724  while (!file_reader.isScanFinished()) {
725  {
726  std::lock_guard<std::mutex> pending_requests_lock(
727  multi_threading_params.pending_requests_mutex);
728  if (!multi_threading_params.continue_processing) {
729  break;
730  }
731  }
732  auto request = get_request_from_pool(multi_threading_params);
733  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
734 
735  if (residual_buffer_size > 0) {
736  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
737  }
738  size_t size = residual_buffer_size;
739  size += file_reader.read(request.buffer.get() + residual_buffer_size,
740  alloc_size - residual_buffer_size);
741 
742  if (size == 0) {
743  // In some cases at the end of a file we will read 0 bytes even when
744  // file_reader.isScanFinished() is false
745  continue;
746  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
747  // In some cases files with newlines at the end will be encoded with a second
748  // newline that can end up being the only thing in the buffer
749  current_file_offset++;
750  continue;
751  }
752  unsigned int num_rows_in_buffer = 0;
753  request.end_pos = parser.findRowEndPosition(alloc_size,
754  request.buffer,
755  size,
756  copy_params,
757  first_row_index_in_buffer,
758  num_rows_in_buffer,
759  &file_reader);
760  request.buffer_size = size;
761  request.buffer_alloc_size = alloc_size;
762  request.first_row_index = first_row_index_in_buffer;
763  request.file_offset = current_file_offset;
764  request.buffer_row_count = num_rows_in_buffer;
765 
766  residual_buffer_size = size - request.end_pos;
767  if (residual_buffer_size > 0) {
768  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
769  memcpy(residual_buffer.get(),
770  request.buffer.get() + request.end_pos,
771  residual_buffer_size);
772  }
773 
774  current_file_offset += request.end_pos;
775  first_row_index_in_buffer += num_rows_in_buffer;
776 
777  dispatch_metadata_scan_request(multi_threading_params, request);
778  }
779 
780  std::unique_lock<std::mutex> pending_requests_queue_lock(
781  multi_threading_params.pending_requests_mutex);
782  multi_threading_params.pending_requests_condition.wait(
783  pending_requests_queue_lock, [&multi_threading_params] {
784  return multi_threading_params.pending_requests.empty() ||
785  (multi_threading_params.continue_processing == false);
786  });
787  multi_threading_params.continue_processing = false;
788  pending_requests_queue_lock.unlock();
789  multi_threading_params.pending_requests_condition.notify_all();
790 }
791 
792 namespace {
793 // Create metadata for unscanned columns
794 // Any fragments with any updated rows between start_row and num_rows will be updated
795 // Chunks prior to start_row will be restored from (ie for append
796 // workflows)
798  const ColumnDescriptor* column,
799  const ForeignTable* foreign_table,
800  const int db_id,
801  const size_t start_row,
802  const size_t total_num_rows,
803  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
804  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
805  if (column->columnType.is_varlen_indeed()) {
806  chunk_key.emplace_back(1);
807  }
808 
809  // Create placeholder metadata for every fragment touched by this scan
810  int start_fragment = start_row / foreign_table->maxFragRows;
811  int end_fragment{0};
812  if (total_num_rows > 0) {
813  end_fragment = (total_num_rows - 1) / foreign_table->maxFragRows;
814  }
815  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
816  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
817  (fragment_id + 1)) > total_num_rows)
818  ? total_num_rows % foreign_table->maxFragRows
819  : foreign_table->maxFragRows;
820 
821  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
822  chunk_metadata_map[chunk_key] = get_placeholder_metadata(column, num_elements);
823  }
824 }
825 
826 } // namespace
827 
841  ChunkMetadataVector& chunk_metadata_vector) {
842  auto timer = DEBUG_TIMER(__func__);
843 
844  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
845  const auto file_path = getFullFilePath(foreign_table_);
847  CHECK(catalog);
848  auto& parser = getFileBufferParser();
849  auto& server_options = foreign_table_->foreign_server->options;
850  if (foreign_table_->isAppendMode() && file_reader_ != nullptr) {
851  parser.validateFiles(file_reader_.get(), foreign_table_);
852  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
853  file_reader_->checkForMoreRows(append_start_offset_);
854  } else {
855  UNREACHABLE();
856  }
857  } else {
858  // Should only be called once for non-append tables
859  CHECK(chunk_metadata_map_.empty());
861  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
862  file_reader_ = std::make_unique<LocalMultiFileReader>(
863  file_path,
864  copy_params,
870  } else {
871  UNREACHABLE();
872  }
873  parser.validateFiles(file_reader_.get(), foreign_table_);
874  num_rows_ = 0;
876  }
877 
878  auto columns =
879  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
880  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
881  for (auto column : columns) {
882  column_by_id[column->columnId] = column;
883  }
884  MetadataScanMultiThreadingParams multi_threading_params;
885  multi_threading_params.disable_cache = disable_cache_;
886 
887  // Restore previous chunk data
888  if (foreign_table_->isAppendMode()) {
889  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
890  }
891 
892  std::set<int> columns_to_scan;
893  for (auto column : columns) {
894  if (!skip_metadata_scan(column)) {
895  columns_to_scan.insert(column->columnId);
896  }
897  }
898 
899  // Track where scan started for appends
900  int start_row = num_rows_;
901  if (!file_reader_->isScanFinished()) {
902  auto buffer_size = get_buffer_size(copy_params,
903  file_reader_->isRemainingSizeKnown(),
904  file_reader_->getRemainingSize());
905  auto thread_count = get_thread_count(copy_params,
906  file_reader_->isRemainingSizeKnown(),
907  file_reader_->getRemainingSize(),
908  buffer_size);
909  multi_threading_params.continue_processing = true;
910 
911  std::vector<std::future<void>> futures{};
912  for (size_t i = 0; i < thread_count; i++) {
913  multi_threading_params.request_pool.emplace(buffer_size,
914  copy_params,
915  db_id_,
917  columns_to_scan,
919  nullptr,
921  // TODO: when the cache is renabled for the import case, the above
922  // relationship between `disable_cache_` and `track_rejected_rows`
923  // will no longer hold and will need to be addressed using a different
924  // approach
925 
926  futures.emplace_back(std::async(std::launch::async,
928  std::ref(multi_threading_params),
930  std::ref(parser)));
931  }
932 
933  try {
935  file_path,
936  (*file_reader_),
937  copy_params,
938  multi_threading_params,
939  num_rows_,
942  } catch (...) {
943  {
944  std::unique_lock<std::mutex> pending_requests_lock(
945  multi_threading_params.pending_requests_mutex);
946  multi_threading_params.continue_processing = false;
947  }
948  multi_threading_params.pending_requests_condition.notify_all();
949  throw;
950  }
951 
952  for (auto& future : futures) {
953  // get() instead of wait() because we need to propagate potential exceptions.
954  future.get();
955  }
956  }
957 
958  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
959  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
960  CHECK(column_entry != column_by_id.end());
961  const auto& column_type = column_entry->second->columnType;
962  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
963  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
964  const auto& cached_chunks = multi_threading_params.cached_chunks;
965  if (!column_type.is_varlen_indeed()) {
966  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
967  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
968  chunk_entry != cached_chunks.end()) {
969  auto buffer = chunk_entry->second.getBuffer();
970  CHECK(buffer);
971  chunk_metadata->numBytes = buffer->size();
972  } else {
973  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
974  }
975  chunk_metadata_map_[chunk_key] = chunk_metadata;
976  }
977 
978  for (auto column : columns) {
979  if (skip_metadata_scan(column)) {
981  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
982  }
983  }
984 
985  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
986  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
987  }
988 
989  // Save chunk data
990  if (foreign_table_->isAppendMode()) {
991  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
992  }
993 }
994 
996  rapidjson::Document d;
997  d.SetObject();
998 
999  // Save fragment map
1002  "fragment_id_to_file_regions_map",
1003  d.GetAllocator());
1004 
1005  // Save reader metadata
1006  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1007  file_reader_->serialize(reader_metadata, d.GetAllocator());
1008  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1009 
1010  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1012  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1013 
1014  return json_utils::write_to_string(d);
1015 }
1016 
1018  const std::string& file_path,
1019  const ChunkMetadataVector& chunk_metadata) {
1020  auto d = json_utils::read_from_file(file_path);
1021  CHECK(d.IsObject());
1022 
1023  // Restore fragment map
1025  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1026 
1027  // Construct reader with metadta
1028  CHECK(d.HasMember("reader_metadata"));
1029  const auto copy_params = getFileBufferParser().validateAndGetCopyParams(foreign_table_);
1030  const auto full_file_path = getFullFilePath(foreign_table_);
1031  auto& server_options = foreign_table_->foreign_server->options;
1032  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1033  file_reader_ = std::make_unique<LocalMultiFileReader>(
1034  full_file_path, copy_params, d["reader_metadata"]);
1035  } else {
1036  UNREACHABLE();
1037  }
1038 
1040  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1041 
1042  // Now restore the internal metadata maps
1043  CHECK(chunk_metadata_map_.empty());
1044  CHECK(chunk_encoder_buffers_.empty());
1045 
1046  for (auto& pair : chunk_metadata) {
1047  chunk_metadata_map_[pair.first] = pair.second;
1048 
1049  if (foreign_table_->isAppendMode()) {
1050  // Restore encoder state for append mode
1051  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1052  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1053  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1054  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1055  pair.second->numElements);
1056  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1057  pair.second->chunkStats);
1058  chunk_encoder_buffers_[pair.first]->setUpdated();
1059  }
1060  }
1061  is_restored_ = true;
1062 }
1063 
1065  return is_restored_;
1066 }
1067 
1068 // declared in three derived classes to avoid
1069 // polluting ForeignDataWrapper virtual base
1070 // @TODO refactor to lower class if needed
1072  // must have these
1073  CHECK_GE(db_id_, 0);
1075 
1076  // populate map for all poly columns in this table
1078  CHECK(catalog);
1079  auto columns =
1080  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
1081  for (auto const& column : columns) {
1082  if (IS_GEO_POLY(column->columnType.get_type())) {
1084  .try_emplace(column->columnId,
1085  std::make_unique<import_export::RenderGroupAnalyzer>())
1086  .second);
1087  }
1088  }
1089 }
1090 
1091 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, FileReader &file_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset, const TextFileBufferParser &parser)
#define CHECK_EQ(x, y)
Definition: Logger.h:231
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool is_last_block, bool disable_cache)
std::vector< int > ChunkKey
Definition: types.h:37
virtual size_t read(void *buffer, size_t max_size)=0
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool is_varlen() const
Definition: sqltypes.h:536
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
#define UNREACHABLE()
Definition: Logger.h:267
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
#define CHECK_GE(x, y)
Definition: Logger.h:236
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
virtual const TextFileBufferParser & getFileBufferParser() const =0
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::optional< ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
void createRenderGroupAnalyzers() override
Create RenderGroupAnalyzers for poly columns.
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
std::vector< FileRegion > FileRegions
Definition: FileRegions.h:67
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
bool key_does_not_shard_to_leaf(const ChunkKey &key)
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:337
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
rapidjson::Document read_from_file(const std::string &file_path)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, FileReader &file_reader, ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, const TextFileBufferParser &parser)
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, const TextFileBufferParser &parser)
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
bool is_dict_encoded_type() const
Definition: sqltypes.h:560
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
specifies the content in-memory of a row in the column metadata table
virtual bool isScanFinished()=0
std::optional< std::string > getOption(const std::string_view &key) const
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const ParseBufferRequest &request, ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHECK_LE(x, y)
Definition: Logger.h:234
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id, AbstractBuffer *delete_buffer)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
virtual size_t findRowEndPosition(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
bool g_enable_watchdog false
Definition: Execute.cpp:79
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const ParseBufferResult &result, const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
virtual import_export::CopyParams validateAndGetCopyParams(const ForeignTable *foreign_table) const =0
std::string write_to_string(const rapidjson::Document &document)
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
virtual ParseBufferResult parseBuffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const =0
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:542
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
int8_t * numbersPtr
Definition: sqltypes.h:226
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
virtual void reserve(size_t num_bytes)=0
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, ParseBufferRequest &request)
bool is_array() const
Definition: sqltypes.h:518
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255