OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CsvDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "CsvDataWrapper.h"
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 #include <queue>
23 
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
26 
27 #include "CsvShared.h"
31 #include "FsiJsonUtils.h"
33 #include "ImportExport/Importer.h"
34 #include "Shared/sqltypes.h"
35 #include "Utils/DdlUtils.h"
36 
37 namespace foreign_storage {
38 CsvDataWrapper::CsvDataWrapper() : db_id_(-1), foreign_table_(nullptr) {}
39 
40 CsvDataWrapper::CsvDataWrapper(const int db_id, const ForeignTable* foreign_table)
41  : db_id_(db_id), foreign_table_(foreign_table), is_restored_(false) {}
42 
43 void CsvDataWrapper::validateTableOptions(const ForeignTable* foreign_table) const {
45  Csv::validate_options(foreign_table);
46 }
47 const std::set<std::string_view>& CsvDataWrapper::getSupportedTableOptions() const {
48  static const auto supported_table_options = getAllCsvTableOptions();
49  return supported_table_options;
50 }
51 
52 std::set<std::string_view> CsvDataWrapper::getAllCsvTableOptions() const {
53  std::set<std::string_view> supported_table_options(
56  supported_table_options.insert(csv_table_options_.begin(), csv_table_options_.end());
57  return supported_table_options;
58 }
59 
60 namespace {
61 std::set<const ColumnDescriptor*> get_columns(
62  const ChunkToBufferMap& buffers,
63  std::shared_ptr<Catalog_Namespace::Catalog> catalog,
64  const int32_t table_id,
65  const int fragment_id) {
66  CHECK(!buffers.empty());
67  std::set<const ColumnDescriptor*> columns;
68  for (const auto& entry : buffers) {
69  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
70  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
71  const auto column = catalog->getMetadataForColumnUnlocked(table_id, column_id);
72  columns.emplace(column);
73  }
74  return columns;
75 }
76 } // namespace
77 
78 namespace {
79 bool skip_metadata_scan(const ColumnDescriptor* column) {
80  return column->columnType.is_dict_encoded_type();
81 }
82 } // namespace
83 
85  const std::set<const ColumnDescriptor*>& columns,
86  const int fragment_id,
87  const ChunkToBufferMap& buffers,
88  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
89  for (const auto column : columns) {
90  ChunkKey data_chunk_key = {
91  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
92  column_id_to_chunk_map[column->columnId] =
93  Csv::make_chunk_for_column(data_chunk_key, chunk_metadata_map_, buffers);
94  }
95 }
96 
98  const ChunkToBufferMap& optional_buffers) {
99  auto timer = DEBUG_TIMER(__func__);
101  CHECK(catalog);
102  CHECK(!required_buffers.empty());
103 
104  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
105  std::set<const ColumnDescriptor*> required_columns =
106  get_columns(required_buffers, catalog, foreign_table_->tableId, fragment_id);
107  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
109  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
110 
111  if (!optional_buffers.empty()) {
112  std::set<const ColumnDescriptor*> optional_columns;
113  optional_columns =
114  get_columns(optional_buffers, catalog, foreign_table_->tableId, fragment_id);
116  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
117  }
118  populateChunks(column_id_to_chunk_map, fragment_id);
119  updateMetadata(column_id_to_chunk_map, fragment_id);
120  for (auto& entry : column_id_to_chunk_map) {
121  entry.second.setBuffer(nullptr);
122  entry.second.setIndexBuffer(nullptr);
123  }
124 }
125 
126 // if column was skipped during scan, update metadata now
128  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
129  int fragment_id) {
130  auto fragmenter = foreign_table_->fragmenter;
131  if (fragmenter) {
133  CHECK(catalog);
134  for (auto& entry : column_id_to_chunk_map) {
135  const auto& column =
136  catalog->getMetadataForColumnUnlocked(foreign_table_->tableId, entry.first);
137  if (skip_metadata_scan(column)) {
138  ChunkKey data_chunk_key = {
139  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
140  if (column->columnType.is_varlen_indeed()) {
141  data_chunk_key.emplace_back(1);
142  }
143  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
144  auto cached_metadata = chunk_metadata_map_[data_chunk_key];
145  auto chunk_metadata =
146  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
147  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
148  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
149  cached_metadata->chunkStats.has_nulls = chunk_metadata->chunkStats.has_nulls;
150  cached_metadata->numBytes = entry.second.getBuffer()->size();
151  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
152  }
153  }
154  }
155 }
156 
162  size_t file_offset;
163  size_t row_count;
164  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
165 
166  bool operator<(const ParseFileRegionResult& other) const {
167  return file_offset < other.file_offset;
168  }
169 };
170 
176  const FileRegions& file_regions,
177  const size_t start_index,
178  const size_t end_index,
179  CsvReader& csv_reader,
180  csv_file_buffer_parser::ParseBufferRequest& parse_file_request,
181  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
182  ParseFileRegionResult load_file_region_result{};
183  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
184  load_file_region_result.row_count = 0;
185 
187  for (size_t i = start_index; i <= end_index; i++) {
188  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
189  size_t read_size;
190  {
191  read_size = csv_reader.readRegion(parse_file_request.buffer.get(),
192  file_regions[i].first_row_file_offset,
193  file_regions[i].region_size);
194  }
195 
196  CHECK_EQ(file_regions[i].region_size, read_size);
197  parse_file_request.begin_pos = 0;
198  parse_file_request.end_pos = file_regions[i].region_size;
199  parse_file_request.first_row_index = file_regions[i].first_row_index;
200  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
201  parse_file_request.process_row_count = file_regions[i].row_count;
202 
203  result = parse_buffer(parse_file_request, i == end_index);
204  CHECK_EQ(file_regions[i].row_count, result.row_count);
205  load_file_region_result.row_count += result.row_count;
206  }
207  load_file_region_result.column_id_to_data_blocks_map =
209  return load_file_region_result;
210 }
211 
215 size_t get_buffer_size(const import_export::CopyParams& copy_params,
216  const bool size_known,
217  const size_t file_size) {
218  size_t buffer_size = copy_params.buffer_size;
219  if (size_known && file_size < buffer_size) {
220  buffer_size = file_size + 1; // +1 for end of line character, if missing
221  }
222  return buffer_size;
223 }
224 
225 size_t get_buffer_size(const FileRegions& file_regions) {
226  size_t buffer_size = 0;
227  for (const auto& file_region : file_regions) {
228  buffer_size = std::max(buffer_size, file_region.region_size);
229  }
230  CHECK(buffer_size);
231  return buffer_size;
232 }
233 
238 size_t get_thread_count(const import_export::CopyParams& copy_params,
239  const bool size_known,
240  const size_t file_size,
241  const size_t buffer_size) {
242  size_t thread_count = copy_params.threads;
243  if (thread_count == 0) {
244  thread_count = std::thread::hardware_concurrency();
245  }
246  if (size_known) {
247  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
248  if (num_buffers_in_file < thread_count) {
249  thread_count = num_buffers_in_file;
250  }
251  }
252  CHECK(thread_count);
253  return thread_count;
254 }
255 
256 size_t get_thread_count(const import_export::CopyParams& copy_params,
257  const FileRegions& file_regions) {
258  size_t thread_count = copy_params.threads;
259  if (thread_count == 0) {
260  thread_count =
261  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
262  }
263  CHECK(thread_count);
264  return thread_count;
265 }
266 
268  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
269  int fragment_id) {
270  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
271 
272  CHECK(!column_id_to_chunk_map.empty());
273  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
274  CHECK(!file_regions.empty());
275 
276  const auto buffer_size = get_buffer_size(file_regions);
277  const auto thread_count = get_thread_count(copy_params, file_regions);
278 
279  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
280 
281  std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
282  parse_file_requests.reserve(thread_count);
283  std::vector<std::future<ParseFileRegionResult>> futures{};
284  std::set<int> column_filter_set;
285  for (const auto& pair : column_id_to_chunk_map) {
286  column_filter_set.insert(pair.first);
287  }
288 
289  std::vector<std::unique_ptr<CsvReader>> csv_readers;
290  rapidjson::Value reader_metadata(rapidjson::kObjectType);
291  rapidjson::Document d;
292  auto& server_options = foreign_table_->foreign_server->options;
293  csv_reader_->serialize(reader_metadata, d.GetAllocator());
294  const auto csv_file_path = getFullFilePath(foreign_table_);
295 
296  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
297  parse_file_requests.emplace_back(buffer_size,
298  copy_params,
299  db_id_,
301  column_filter_set,
302  csv_file_path);
303  auto start_index = i;
304  auto end_index =
305  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
306 
307  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
308  csv_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
309  csv_file_path, copy_params, reader_metadata));
310  } else {
311  UNREACHABLE();
312  }
313 
314  futures.emplace_back(std::async(std::launch::async,
316  std::ref(file_regions),
317  start_index,
318  end_index,
319  std::ref(*(csv_readers.back())),
320  std::ref(parse_file_requests.back()),
321  std::ref(column_id_to_chunk_map)));
322  }
323 
324  std::vector<ParseFileRegionResult> load_file_region_results{};
325  for (auto& future : futures) {
326  future.wait();
327  load_file_region_results.emplace_back(future.get());
328  }
329 
330  for (auto result : load_file_region_results) {
331  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
332  chunk.appendData(
333  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
334  }
335  }
336 }
337 
344 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
345  const size_t max_fragment_size,
346  const size_t buffer_row_count) {
347  CHECK(buffer_row_count > 0);
348  std::vector<size_t> partitions{};
349  size_t remaining_rows_in_last_fragment;
350  if (start_row_index % max_fragment_size == 0) {
351  remaining_rows_in_last_fragment = 0;
352  } else {
353  remaining_rows_in_last_fragment =
354  max_fragment_size - (start_row_index % max_fragment_size);
355  }
356  if (buffer_row_count <= remaining_rows_in_last_fragment) {
357  partitions.emplace_back(buffer_row_count);
358  } else {
359  if (remaining_rows_in_last_fragment > 0) {
360  partitions.emplace_back(remaining_rows_in_last_fragment);
361  }
362  size_t remaining_buffer_row_count =
363  buffer_row_count - remaining_rows_in_last_fragment;
364  while (remaining_buffer_row_count > 0) {
365  partitions.emplace_back(
366  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
367  remaining_buffer_row_count -= partitions.back();
368  }
369  }
370  return partitions;
371 }
372 
379  std::queue<csv_file_buffer_parser::ParseBufferRequest> pending_requests;
381  std::condition_variable pending_requests_condition;
382  std::queue<csv_file_buffer_parser::ParseBufferRequest> request_pool;
383  std::mutex request_pool_mutex;
384  std::condition_variable request_pool_condition;
386  std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer>> chunk_encoder_buffers;
387  std::map<ChunkKey, Chunk_NS::Chunk> cached_chunks;
389 };
390 
395 std::optional<csv_file_buffer_parser::ParseBufferRequest> get_next_metadata_scan_request(
396  MetadataScanMultiThreadingParams& multi_threading_params) {
397  std::unique_lock<std::mutex> pending_requests_lock(
398  multi_threading_params.pending_requests_mutex);
399  multi_threading_params.pending_requests_condition.wait(
400  pending_requests_lock, [&multi_threading_params] {
401  return !multi_threading_params.pending_requests.empty() ||
402  !multi_threading_params.continue_processing;
403  });
404  if (multi_threading_params.pending_requests.empty()) {
405  return {};
406  }
407  auto request = std::move(multi_threading_params.pending_requests.front());
408  multi_threading_params.pending_requests.pop();
409  pending_requests_lock.unlock();
410  multi_threading_params.pending_requests_condition.notify_all();
411  return std::move(request);
412 }
413 
418 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
419  int fragment_id,
420  size_t first_row_index,
422  const std::string& file_path) {
423  fragment_id_to_file_regions_map[fragment_id].emplace_back(
424  // file naming is handled by CsvReader
425  FileRegion(result.row_offsets.front(),
426  first_row_index,
427  result.row_count,
428  result.row_offsets.back() - result.row_offsets.front()));
429 }
430 
435 void update_stats(Encoder* encoder,
436  const SQLTypeInfo& column_type,
437  DataBlockPtr data_block,
438  const size_t row_count) {
439  if (column_type.is_array()) {
440  encoder->updateStats(data_block.arraysPtr, 0, row_count);
441  } else if (!column_type.is_varlen()) {
442  encoder->updateStats(data_block.numbersPtr, row_count);
443  } else {
444  encoder->updateStats(data_block.stringsPtr, 0, row_count);
445  }
446 }
447 namespace {
449  std::shared_ptr<Catalog_Namespace::Catalog>& catalog) {
450  if (catalog->getDataMgr()
451  .getPersistentStorageMgr()
452  ->getDiskCacheConfig()
453  .isEnabledForFSI()) {
454  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
455  } else {
456  return nullptr;
457  }
458 }
459 } // namespace
460 
461 // If cache is enabled, populate cached_chunks buffers with data blocks
462 void cache_blocks(std::map<ChunkKey, Chunk_NS::Chunk>& cached_chunks,
463  DataBlockPtr data_block,
464  size_t row_count,
465  ChunkKey& chunk_key,
466  const ColumnDescriptor* column,
467  bool is_first_block,
468  bool is_last_block) {
469  auto catalog =
471  CHECK(catalog);
472  auto cache = get_cache_if_enabled(catalog);
473  if (cache) {
474  ChunkKey index_key = {chunk_key[CHUNK_KEY_DB_IDX],
475  chunk_key[CHUNK_KEY_TABLE_IDX],
476  chunk_key[CHUNK_KEY_COLUMN_IDX],
477  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
478  2};
479  // Create actual data chunks to prepopulate cache
480  if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
481  cached_chunks[chunk_key] = Chunk_NS::Chunk{column};
482  cached_chunks[chunk_key].setBuffer(
483  cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
484  if (column->columnType.is_varlen_indeed()) {
485  cached_chunks[chunk_key].setIndexBuffer(
486  cache->getChunkBufferForPrecaching(index_key, is_first_block));
487  }
488  if (is_first_block) {
489  cached_chunks[chunk_key].initEncoder();
490  }
491  }
492  cached_chunks[chunk_key].appendData(data_block, row_count, 0);
493  if (is_last_block) {
494  // cache the chunks now so they are tracked by eviction algorithm
495  std::vector<ChunkKey> key_to_cache{chunk_key};
496  if (column->columnType.is_varlen_indeed()) {
497  key_to_cache.push_back(index_key);
498  }
499  cache->cacheTableChunks(key_to_cache);
500  }
501  }
502 }
503 
510  int fragment_id,
513  std::map<int, const ColumnDescriptor*>& column_by_id,
514  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
515  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
516  // File regions should be added in same order as appendData
517  add_file_region(fragment_id_to_file_regions_map,
518  fragment_id,
519  request.first_row_index,
520  result,
521  request.getFilePath());
522 
523  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
524  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
525  const auto column = column_by_id[column_id];
526  if (column->columnType.is_varlen_indeed()) {
527  chunk_key.emplace_back(1);
528  }
529  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
530  multi_threading_params.chunk_encoder_buffers.end()) {
531  multi_threading_params.chunk_encoder_buffers[chunk_key] =
532  std::make_unique<ForeignStorageBuffer>();
533  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
534  column->columnType);
535  }
536  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder(),
537  column->columnType,
538  data_block,
539  result.row_count);
540  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
541  ->getEncoder()
542  ->getNumElems() +
543  result.row_count;
544  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
545  num_elements);
546  cache_blocks(
547  multi_threading_params.cached_chunks,
548  data_block,
549  result.row_count,
550  chunk_key,
551  column,
552  (num_elements - result.row_count) == 0, // Is the first block added to this chunk
553  num_elements == request.getMaxFragRows() // Is the last block for this chunk
554  );
555  }
556 }
557 
564  std::unique_lock<std::mutex> completed_requests_queue_lock(
565  multi_threading_params.request_pool_mutex);
566  multi_threading_params.request_pool.emplace(std::move(request));
567  completed_requests_queue_lock.unlock();
568  multi_threading_params.request_pool_condition.notify_all();
569 }
570 
575 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
576  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
577  std::map<int, const ColumnDescriptor*> column_by_id{};
578  while (true) {
579  auto request_opt = get_next_metadata_scan_request(multi_threading_params);
580  if (!request_opt.has_value()) {
581  break;
582  }
583  auto& request = request_opt.value();
584  try {
585  if (column_by_id.empty()) {
586  for (const auto column : request.getColumns()) {
587  column_by_id[column->columnId] = column;
588  }
589  }
590  auto partitions = partition_by_fragment(
591  request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
592  request.begin_pos = 0;
593  size_t row_index = request.first_row_index;
594  for (const auto partition : partitions) {
595  request.process_row_count = partition;
596  for (const auto& import_buffer : request.import_buffers) {
597  if (import_buffer != nullptr) {
598  import_buffer->clear();
599  }
600  }
601  auto result = parse_buffer(request, true);
602  int fragment_id = row_index / request.getMaxFragRows();
603  process_data_blocks(multi_threading_params,
604  fragment_id,
605  request,
606  result,
607  column_by_id,
608  fragment_id_to_file_regions_map);
609  row_index += result.row_count;
610  request.begin_pos = result.row_offsets.back() - request.file_offset;
611  }
612  } catch (...) {
613  // Re-add request to pool so we dont block any other threads
614  {
615  std::lock_guard<std::mutex> pending_requests_lock(
616  multi_threading_params.pending_requests_mutex);
617  multi_threading_params.continue_processing = false;
618  }
619  add_request_to_pool(multi_threading_params, request);
620  throw;
621  }
622  add_request_to_pool(multi_threading_params, request);
623  }
624 }
625 
630  MetadataScanMultiThreadingParams& multi_threading_params) {
631  std::unique_lock<std::mutex> request_pool_lock(
632  multi_threading_params.request_pool_mutex);
633  multi_threading_params.request_pool_condition.wait(
634  request_pool_lock,
635  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
636  auto request = std::move(multi_threading_params.request_pool.front());
637  multi_threading_params.request_pool.pop();
638  request_pool_lock.unlock();
639  CHECK(request.buffer);
640  return request;
641 }
642 
648  MetadataScanMultiThreadingParams& multi_threading_params,
650  {
651  std::unique_lock<std::mutex> pending_requests_lock(
652  multi_threading_params.pending_requests_mutex);
653  multi_threading_params.pending_requests.emplace(std::move(request));
654  }
655  multi_threading_params.pending_requests_condition.notify_all();
656 }
657 
662 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
663  size_t& buffer_size,
664  const size_t alloc_size) {
665  CHECK_LE(buffer_size, alloc_size);
666  if (buffer_size < alloc_size) {
667  buffer = std::make_unique<char[]>(alloc_size);
668  buffer_size = alloc_size;
669  }
670 }
671 
677  const size_t& buffer_size,
678  const std::string& file_path,
679  CsvReader& csv_reader,
680  const import_export::CopyParams& copy_params,
681  MetadataScanMultiThreadingParams& multi_threading_params,
682  size_t& first_row_index_in_buffer,
683  size_t& current_file_offset) {
684  auto alloc_size = buffer_size;
685  auto residual_buffer = std::make_unique<char[]>(alloc_size);
686  size_t residual_buffer_size = 0;
687  size_t residual_buffer_alloc_size = alloc_size;
688 
689  while (!csv_reader.isScanFinished()) {
690  {
691  std::lock_guard<std::mutex> pending_requests_lock(
692  multi_threading_params.pending_requests_mutex);
693  if (!multi_threading_params.continue_processing) {
694  break;
695  }
696  }
697  auto request = get_request_from_pool(multi_threading_params);
698  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
699 
700  if (residual_buffer_size > 0) {
701  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
702  }
703  size_t size = residual_buffer_size;
704  size += csv_reader.read(request.buffer.get() + residual_buffer_size,
705  alloc_size - residual_buffer_size);
706 
707  if (size == 0) {
708  // In some cases at the end of a file we will read 0 bytes even when
709  // csv_reader.isScanFinished() is false
710  continue;
711  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
712  // In some cases files with newlines at the end will be encoded with a second
713  // newline that can end up being the only thing in the buffer
714  current_file_offset++;
715  continue;
716  }
717  unsigned int num_rows_in_buffer = 0;
718  request.end_pos =
720  request.buffer,
721  size,
722  copy_params,
723  first_row_index_in_buffer,
724  num_rows_in_buffer,
725  nullptr,
726  &csv_reader);
727  request.buffer_size = size;
728  request.buffer_alloc_size = alloc_size;
729  request.first_row_index = first_row_index_in_buffer;
730  request.file_offset = current_file_offset;
731  request.buffer_row_count = num_rows_in_buffer;
732 
733  residual_buffer_size = size - request.end_pos;
734  if (residual_buffer_size > 0) {
735  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
736  memcpy(residual_buffer.get(),
737  request.buffer.get() + request.end_pos,
738  residual_buffer_size);
739  }
740 
741  current_file_offset += request.end_pos;
742  first_row_index_in_buffer += num_rows_in_buffer;
743 
744  dispatch_metadata_scan_request(multi_threading_params, request);
745  }
746 
747  std::unique_lock<std::mutex> pending_requests_queue_lock(
748  multi_threading_params.pending_requests_mutex);
749  multi_threading_params.pending_requests_condition.wait(
750  pending_requests_queue_lock, [&multi_threading_params] {
751  return multi_threading_params.pending_requests.empty() ||
752  (multi_threading_params.continue_processing == false);
753  });
754  multi_threading_params.continue_processing = false;
755  pending_requests_queue_lock.unlock();
756  multi_threading_params.pending_requests_condition.notify_all();
757 }
758 
759 namespace {
760 // Create metadata for unscanned columns
761 // Any fragments with any updated rows between start_row and num_rows will be updated
762 // Chunks prior to start_row will be restored from (ie for append
763 // workflows)
765  const ColumnDescriptor* column,
766  const ForeignTable* foreign_table,
767  const int db_id,
768  const size_t start_row,
769  const size_t total_num_rows,
770  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
771  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
772  if (column->columnType.is_varlen_indeed()) {
773  chunk_key.emplace_back(1);
774  }
775 
776  // Create placeholder metadata for every fragment touched by this scan
777  int start_fragment = start_row / foreign_table->maxFragRows;
778  int end_fragment = total_num_rows / foreign_table->maxFragRows;
779  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
780  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
781  (fragment_id + 1)) > total_num_rows)
782  ? total_num_rows % foreign_table->maxFragRows
783  : foreign_table->maxFragRows;
784 
785  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
786  chunk_metadata_map[chunk_key] = Csv::get_placeholder_metadata(column, num_elements);
787  }
788 }
789 
790 } // namespace
791 
805  auto timer = DEBUG_TIMER(__func__);
806 
807  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
808  const auto file_path = getFullFilePath(foreign_table_);
810  CHECK(catalog);
811  auto& server_options = foreign_table_->foreign_server->options;
812  if (foreign_table_->isAppendMode() && csv_reader_ != nullptr) {
813  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
814  csv_reader_->checkForMoreRows(append_start_offset_);
815  } else {
816  UNREACHABLE();
817  }
818  } else {
819  // Should only be called once for non-append tables
820  CHECK(chunk_metadata_map_.empty());
822  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
823  csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
824  } else {
825  UNREACHABLE();
826  }
827  num_rows_ = 0;
829  }
830 
831  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
832  foreign_table_->tableId, false, false, true);
833  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
834  for (auto column : columns) {
835  column_by_id[column->columnId] = column;
836  }
837  MetadataScanMultiThreadingParams multi_threading_params;
838 
839  // Restore previous chunk data
840  if (foreign_table_->isAppendMode()) {
841  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
842  }
843 
844  std::set<int> columns_to_scan;
845  for (auto column : columns) {
846  if (!skip_metadata_scan(column)) {
847  columns_to_scan.insert(column->columnId);
848  }
849  }
850 
851  // Track where scan started for appends
852  int start_row = num_rows_;
853  if (!csv_reader_->isScanFinished()) {
854  auto buffer_size = get_buffer_size(copy_params,
855  csv_reader_->isRemainingSizeKnown(),
856  csv_reader_->getRemainingSize());
857  auto thread_count = get_thread_count(copy_params,
858  csv_reader_->isRemainingSizeKnown(),
859  csv_reader_->getRemainingSize(),
860  buffer_size);
861  multi_threading_params.continue_processing = true;
862 
863  std::vector<std::future<void>> futures{};
864  for (size_t i = 0; i < thread_count; i++) {
865  multi_threading_params.request_pool.emplace(buffer_size,
866  copy_params,
867  db_id_,
869  columns_to_scan,
871 
872  futures.emplace_back(std::async(std::launch::async,
874  std::ref(multi_threading_params),
876  }
877 
878  try {
880  file_path,
881  (*csv_reader_),
882  copy_params,
883  multi_threading_params,
884  num_rows_,
886  } catch (...) {
887  {
888  std::unique_lock<std::mutex> pending_requests_lock(
889  multi_threading_params.pending_requests_mutex);
890  multi_threading_params.continue_processing = false;
891  }
892  multi_threading_params.pending_requests_condition.notify_all();
893  throw;
894  }
895 
896  for (auto& future : futures) {
897  // get() instead of wait() because we need to propagate potential exceptions.
898  future.get();
899  }
900  }
901 
902  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
903  auto column_entry = column_by_id.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
904  CHECK(column_entry != column_by_id.end());
905  const auto& column_type = column_entry->second->columnType;
906  auto chunk_metadata = buffer->getEncoder()->getMetadata(column_type);
907  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
908  const auto& cached_chunks = multi_threading_params.cached_chunks;
909  if (!column_type.is_varlen_indeed()) {
910  chunk_metadata->numBytes = column_type.get_size() * chunk_metadata->numElements;
911  } else if (auto chunk_entry = cached_chunks.find(chunk_key);
912  chunk_entry != cached_chunks.end()) {
913  auto buffer = chunk_entry->second.getBuffer();
914  CHECK(buffer);
915  chunk_metadata->numBytes = buffer->size();
916  } else {
917  CHECK_EQ(chunk_metadata->numBytes, static_cast<size_t>(0));
918  }
919  chunk_metadata_map_[chunk_key] = chunk_metadata;
920  }
921 
922  for (auto column : columns) {
923  if (skip_metadata_scan(column)) {
925  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
926  }
927  }
928 
929  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
930  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
931  }
932 
933  // Save chunk data
934  if (foreign_table_->isAppendMode()) {
935  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
936  }
937 
938  // Any incomplete chunks should be cached now
939  auto cache = get_cache_if_enabled(catalog);
940  if (cache) {
941  std::vector<ChunkKey> to_cache;
942  for (auto& [chunk_key, buffer] : multi_threading_params.cached_chunks) {
943  if (buffer.getBuffer()->getEncoder()->getNumElems() !=
944  static_cast<size_t>(foreign_table_->maxFragRows)) {
945  if (column_by_id[chunk_key[CHUNK_KEY_COLUMN_IDX]]
946  ->columnType.is_varlen_indeed()) {
947  ChunkKey index_chunk_key = chunk_key;
948  index_chunk_key[4] = 2;
949  to_cache.push_back(chunk_key);
950  to_cache.push_back(index_chunk_key);
951  } else {
952  to_cache.push_back(chunk_key);
953  }
954  }
955  }
956  if (to_cache.size() > 0) {
957  cache->cacheTableChunks(to_cache);
958  }
959  }
960 }
961 
962 void CsvDataWrapper::serializeDataWrapperInternals(const std::string& file_path) const {
963  rapidjson::Document d;
964  d.SetObject();
965 
966  // Save fragment map
969  "fragment_id_to_file_regions_map",
970  d.GetAllocator());
971 
972  // Save csv_reader metadata
973  rapidjson::Value reader_metadata(rapidjson::kObjectType);
974  csv_reader_->serialize(reader_metadata, d.GetAllocator());
975  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
976 
977  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
979  d, append_start_offset_, "append_start_offset", d.GetAllocator());
980 
981  json_utils::write_to_file(d, file_path);
982 }
983 
985  const std::string& file_path,
986  const ChunkMetadataVector& chunk_metadata) {
987  auto d = json_utils::read_from_file(file_path);
988  CHECK(d.IsObject());
989 
990  // Restore fragment map
992  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
993 
994  // Construct csv_reader with metadta
995  CHECK(d.HasMember("reader_metadata"));
996  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
997  const auto csv_file_path = getFullFilePath(foreign_table_);
998  auto& server_options = foreign_table_->foreign_server->options;
999  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1000  csv_reader_ = std::make_unique<LocalMultiFileReader>(
1001  csv_file_path, copy_params, d["reader_metadata"]);
1002  } else {
1003  UNREACHABLE();
1004  }
1005 
1007  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1008 
1009  // Now restore the internal metadata maps
1010  CHECK(chunk_metadata_map_.empty());
1011  CHECK(chunk_encoder_buffers_.empty());
1012 
1013  for (auto& pair : chunk_metadata) {
1014  chunk_metadata_map_[pair.first] = pair.second;
1015 
1016  if (foreign_table_->isAppendMode()) {
1017  // Restore encoder state for append mode
1018  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1019  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1020  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1021  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1022  pair.second->numElements);
1023  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1024  pair.second->chunkStats);
1025  chunk_encoder_buffers_[pair.first]->setUpdated();
1026  }
1027  }
1028  is_restored_ = true;
1029 }
1030 
1032  return is_restored_;
1033 }
1034 
1035 const std::set<std::string_view> CsvDataWrapper::csv_table_options_{"ARRAY_DELIMITER",
1036  "ARRAY_MARKER",
1037  "BUFFER_SIZE",
1038  "DELIMITER",
1039  "ESCAPE",
1040  "HEADER",
1041  "LINE_DELIMITER",
1042  "LONLAT",
1043  "NULLS",
1044  "QUOTE",
1045  "QUOTED",
1046  "S3_ACCESS_TYPE"};
1047 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool isRestored() const override
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
void serializeDataWrapperInternals(const std::string &file_path) const override
static const std::set< std::string_view > csv_table_options_
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:126
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool is_last_block)
tuple d
Definition: test_fsi.py:9
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog)
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
bool is_varlen() const
Definition: sqltypes.h:514
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
virtual size_t read(void *buffer, size_t max_size)=0
#define UNREACHABLE()
Definition: Logger.h:247
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
Constants for Builtin SQL Types supported by OmniSci.
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
std::unique_ptr< CsvReader > csv_reader_
std::set< std::string_view > getAllCsvTableOptions() const
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
Definition: CsvShared.cpp:235
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
void validateTableOptions(const ForeignTable *foreign_table) const override
void validate_options(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:121
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const csv_file_buffer_parser::ParseBufferResult &result, const std::string &file_path)
std::vector< FileRegion > FileRegions
Definition: CsvShared.h:71
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
static SysCatalog & instance()
Definition: SysCatalog.h:292
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const ChunkToBufferMap &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
std::map< ChunkKey, Chunk_NS::Chunk > cached_chunks
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
rapidjson::Document read_from_file(const std::string &file_path)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
std::map< int, FileRegions > fragment_id_to_file_regions_map_
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool is_dict_encoded_type() const
Definition: sqltypes.h:530
bool operator<(const ParseFileRegionResult &other) const
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
ParseBufferResult parse_buffer(ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered)
std::queue< csv_file_buffer_parser::ParseBufferRequest > request_pool
specifies the content in-memory of a row in the column metadata table
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool isAppendMode() const
Checks if the table is in append mode.
const std::set< std::string_view > & getSupportedTableOptions() const override
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void validateTableOptions(const ForeignTable *foreign_table) const override
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
#define CHECK_LE(x, y)
Definition: Logger.h:214
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::set< const ColumnDescriptor * > get_columns(const ChunkToBufferMap &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)
csv_file_buffer_parser::ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
std::queue< csv_file_buffer_parser::ParseBufferRequest > pending_requests
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
virtual bool isScanFinished()=0
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const csv_file_buffer_parser::ParseBufferRequest &request, csv_file_buffer_parser::ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
const std::set< std::string_view > & getSupportedTableOptions() const override
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
std::optional< csv_file_buffer_parser::ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:520
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
int8_t * numbersPtr
Definition: sqltypes.h:220
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
bool is_array() const
Definition: sqltypes.h:497
Chunk_NS::Chunk make_chunk_for_column(const ChunkKey &chunk_key, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers)
Definition: CsvShared.cpp:183
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset)
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31