OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CsvDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "CsvDataWrapper.h"
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 
23 #include <rapidjson/document.h>
24 #include <boost/filesystem.hpp>
25 
30 #include "ImportExport/Importer.h"
31 #include "Utils/DdlUtils.h"
32 #include "FsiJsonUtils.h"
33 
34 namespace foreign_storage {
35 CsvDataWrapper::CsvDataWrapper(const int db_id, const ForeignTable* foreign_table)
36  : db_id_(db_id), foreign_table_(foreign_table), is_restored_(false) {}
37 
39  : db_id_(-1), foreign_table_(foreign_table), is_restored_(false) {}
40 
41 void CsvDataWrapper::validateOptions(const ForeignTable* foreign_table) {
42  CsvDataWrapper data_wrapper{foreign_table};
43  data_wrapper.validateAndGetCopyParams();
44  data_wrapper.validateFilePath();
45 }
46 
47 std::vector<std::string_view> CsvDataWrapper::getSupportedOptions() {
48  return std::vector<std::string_view>{supported_options_.begin(),
49  supported_options_.end()};
50 }
51 
53  auto& server_options = foreign_table_->foreign_server->options;
54  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
58  }
59 }
60 
62  import_export::CopyParams copy_params{};
63  copy_params.plain_text = true;
64  if (const auto& value = validateAndGetStringWithLength("ARRAY_DELIMITER", 1);
65  !value.empty()) {
66  copy_params.array_delim = value[0];
67  }
68  if (const auto& value = validateAndGetStringWithLength("ARRAY_MARKER", 2);
69  !value.empty()) {
70  copy_params.array_begin = value[0];
71  copy_params.array_end = value[1];
72  }
73  if (auto it = foreign_table_->options.find("BUFFER_SIZE");
74  it != foreign_table_->options.end()) {
75  copy_params.buffer_size = std::stoi(it->second);
76  }
77  if (const auto& value = validateAndGetStringWithLength("DELIMITER", 1);
78  !value.empty()) {
79  copy_params.delimiter = value[0];
80  }
81  if (const auto& value = validateAndGetStringWithLength("ESCAPE", 1); !value.empty()) {
82  copy_params.escape = value[0];
83  }
84  auto has_header = validateAndGetBoolValue("HEADER");
85  if (has_header.has_value()) {
86  if (has_header.value()) {
87  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
88  } else {
89  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
90  }
91  }
92  if (const auto& value = validateAndGetStringWithLength("LINE_DELIMITER", 1);
93  !value.empty()) {
94  copy_params.line_delim = value[0];
95  }
96  copy_params.lonlat = validateAndGetBoolValue("LONLAT").value_or(copy_params.lonlat);
97 
98  if (auto it = foreign_table_->options.find("NULLS");
99  it != foreign_table_->options.end()) {
100  copy_params.null_str = it->second;
101  }
102  if (const auto& value = validateAndGetStringWithLength("QUOTE", 1); !value.empty()) {
103  copy_params.quote = value[0];
104  }
105  copy_params.quoted = validateAndGetBoolValue("QUOTED").value_or(copy_params.quoted);
106  return copy_params;
107 }
108 
110  const std::string& option_name,
111  const size_t expected_num_chars) {
112  if (auto it = foreign_table_->options.find(option_name);
113  it != foreign_table_->options.end()) {
114  if (it->second.length() != expected_num_chars) {
115  throw std::runtime_error{"Value of \"" + option_name +
116  "\" foreign table option has the wrong number of "
117  "characters. Expected " +
118  std::to_string(expected_num_chars) + " character(s)."};
119  }
120  return it->second;
121  }
122  return "";
123 }
124 
126  const std::string& option_name) {
127  if (auto it = foreign_table_->options.find(option_name);
128  it != foreign_table_->options.end()) {
129  if (boost::iequals(it->second, "TRUE")) {
130  return true;
131  } else if (boost::iequals(it->second, "FALSE")) {
132  return false;
133  } else {
134  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
135  "\" foreign table option. "
136  "Value must be either 'true' or 'false'."};
137  }
138  }
139  return std::nullopt;
140 }
141 
142 namespace {
143 std::set<const ColumnDescriptor*> get_columns(
144  const std::map<ChunkKey, AbstractBuffer*>& buffers,
145  std::shared_ptr<Catalog_Namespace::Catalog> catalog,
146  const int32_t table_id,
147  const int fragment_id) {
148  CHECK(!buffers.empty());
149  std::set<const ColumnDescriptor*> columns;
150  for (const auto& entry : buffers) {
151  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
152  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
153  const auto column = catalog->getMetadataForColumnUnlocked(table_id, column_id);
154  columns.emplace(column);
155  }
156  return columns;
157 }
158 } // namespace
159 
161  const std::set<const ColumnDescriptor*>& columns,
162  const int fragment_id,
163  const std::map<ChunkKey, AbstractBuffer*>& buffers,
164  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
165  for (const auto column : columns) {
166  ChunkKey data_chunk_key;
167  AbstractBuffer* data_buffer = nullptr;
168  AbstractBuffer* index_buffer = nullptr;
169  if (column->columnType.is_varlen_indeed()) {
170  data_chunk_key = {
171  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 1};
172  ChunkKey index_chunk_key = {
173  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 2};
174 
175  CHECK(buffers.find(data_chunk_key) != buffers.end());
176  CHECK(buffers.find(index_chunk_key) != buffers.end());
177 
178  data_buffer = buffers.find(data_chunk_key)->second;
179  index_buffer = buffers.find(index_chunk_key)->second;
180 
181  CHECK_EQ(data_buffer->size(), static_cast<size_t>(0));
182  CHECK_EQ(index_buffer->size(), static_cast<size_t>(0));
183 
184  size_t index_offset_size{0};
185  if (column->columnType.is_string() || column->columnType.is_geometry()) {
186  index_offset_size = sizeof(StringOffsetT);
187  } else if (column->columnType.is_array()) {
188  index_offset_size = sizeof(ArrayOffsetT);
189  } else {
190  UNREACHABLE();
191  }
192  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
193  index_buffer->reserve(index_offset_size *
194  (chunk_metadata_map_[data_chunk_key]->numElements + 1));
195  } else {
196  data_chunk_key = {db_id_, foreign_table_->tableId, column->columnId, fragment_id};
197  CHECK(buffers.find(data_chunk_key) != buffers.end());
198  data_buffer = buffers.find(data_chunk_key)->second;
199  }
200  data_buffer->reserve(chunk_metadata_map_[data_chunk_key]->numBytes);
201  column_id_to_chunk_map[column->columnId] = Chunk_NS::Chunk{column};
202  column_id_to_chunk_map[column->columnId].setBuffer(data_buffer);
203  column_id_to_chunk_map[column->columnId].setIndexBuffer(index_buffer);
204  column_id_to_chunk_map[column->columnId].initEncoder();
205  }
206 }
207 
209  std::map<ChunkKey, AbstractBuffer*>& required_buffers,
210  std::map<ChunkKey, AbstractBuffer*>& optional_buffers) {
211  auto timer = DEBUG_TIMER(__func__);
213  CHECK(!required_buffers.empty());
214 
215  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
216  std::set<const ColumnDescriptor*> required_columns =
217  get_columns(required_buffers, catalog, foreign_table_->tableId, fragment_id);
218  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
220  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
221 
222  if (!optional_buffers.empty()) {
223  std::set<const ColumnDescriptor*> optional_columns;
224  optional_columns =
225  get_columns(optional_buffers, catalog, foreign_table_->tableId, fragment_id);
227  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
228  }
229  populateChunks(column_id_to_chunk_map, fragment_id);
230 
231  for (auto& entry : column_id_to_chunk_map) {
232  entry.second.setBuffer(nullptr);
233  entry.second.setIndexBuffer(nullptr);
234  }
235 }
236 
241  size_t file_offset;
242  size_t row_count;
243  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
244 
245  bool operator<(const ParseFileRegionResult& other) const {
246  return file_offset < other.file_offset;
247  }
248 };
249 
255  const FileRegions& file_regions,
256  const size_t start_index,
257  const size_t end_index,
258  CsvReader& csv_reader,
259  std::mutex& file_access_mutex,
260  csv_file_buffer_parser::ParseBufferRequest& parse_file_request,
261  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
262  ParseFileRegionResult load_file_region_result{};
263  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
264  load_file_region_result.row_count = 0;
265 
267  for (size_t i = start_index; i <= end_index; i++) {
268  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
269  size_t read_size;
270  {
271  std::lock_guard<std::mutex> lock(file_access_mutex);
272  read_size = csv_reader.readRegion(parse_file_request.buffer.get(),
273  file_regions[i].first_row_file_offset,
274  file_regions[i].region_size);
275  }
276 
277  CHECK_EQ(file_regions[i].region_size, read_size);
278  parse_file_request.begin_pos = 0;
279  parse_file_request.end_pos = file_regions[i].region_size;
280  parse_file_request.first_row_index = file_regions[i].first_row_index;
281  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
282  parse_file_request.process_row_count = file_regions[i].row_count;
283 
284  result = parse_buffer(parse_file_request);
285  CHECK_EQ(file_regions[i].row_count, result.row_count);
286  load_file_region_result.row_count += result.row_count;
287  }
288  load_file_region_result.column_id_to_data_blocks_map =
290  return load_file_region_result;
291 }
292 
296 size_t get_buffer_size(const import_export::CopyParams& copy_params,
297  const bool size_known,
298  const size_t file_size) {
299  size_t buffer_size = copy_params.buffer_size;
300  if (size_known && file_size < buffer_size) {
301  buffer_size = file_size + 1; // +1 for end of line character, if missing
302  }
303  return buffer_size;
304 }
305 
306 size_t get_buffer_size(const FileRegions& file_regions) {
307  size_t buffer_size = 0;
308  for (const auto& file_region : file_regions) {
309  buffer_size = std::max(buffer_size, file_region.region_size);
310  }
311  CHECK(buffer_size);
312  return buffer_size;
313 }
314 
319 size_t get_thread_count(const import_export::CopyParams& copy_params,
320  const bool size_known,
321  const size_t file_size,
322  const size_t buffer_size) {
323  size_t thread_count = copy_params.threads;
324  if (thread_count == 0) {
325  thread_count = std::thread::hardware_concurrency();
326  }
327  if (size_known) {
328  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
329  if (num_buffers_in_file < thread_count) {
330  thread_count = num_buffers_in_file;
331  }
332  }
333  CHECK(thread_count);
334  return thread_count;
335 }
336 
337 size_t get_thread_count(const import_export::CopyParams& copy_params,
338  const FileRegions& file_regions) {
339  size_t thread_count = copy_params.threads;
340  if (thread_count == 0) {
341  thread_count =
342  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
343  }
344  CHECK(thread_count);
345  return thread_count;
346 }
347 
349  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
350  int fragment_id) {
351  const auto copy_params = validateAndGetCopyParams();
352 
353  CHECK(!column_id_to_chunk_map.empty());
354  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
355  CHECK(!file_regions.empty());
356 
357  const auto buffer_size = get_buffer_size(file_regions);
358  const auto thread_count = get_thread_count(copy_params, file_regions);
359 
360  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
361 
362  std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
363  parse_file_requests.reserve(thread_count);
364  std::vector<std::future<ParseFileRegionResult>> futures{};
365  std::set<int> column_filter_set;
366  for (const auto& pair : column_id_to_chunk_map) {
367  column_filter_set.insert(pair.first);
368  }
369  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
370  parse_file_requests.emplace_back(
371  buffer_size, copy_params, db_id_, foreign_table_, column_filter_set);
372  auto start_index = i;
373  auto end_index =
374  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
375  futures.emplace_back(std::async(std::launch::async,
377  std::ref(file_regions),
378  start_index,
379  end_index,
380  std::ref((*csv_reader_)),
381  std::ref(file_access_mutex_),
382  std::ref(parse_file_requests.back()),
383  std::ref(column_id_to_chunk_map)));
384  }
385 
386  std::set<ParseFileRegionResult> load_file_region_results{};
387  for (auto& future : futures) {
388  future.wait();
389  load_file_region_results.emplace(future.get());
390  }
391 
392  for (auto result : load_file_region_results) {
393  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
394  chunk.appendData(
395  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
396  }
397  }
398 }
399 
406 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
407  const size_t max_fragment_size,
408  const size_t buffer_row_count) {
409  CHECK(buffer_row_count > 0);
410  std::vector<size_t> partitions{};
411  size_t remaining_rows_in_last_fragment;
412  if (start_row_index % max_fragment_size == 0) {
413  remaining_rows_in_last_fragment = 0;
414  } else {
415  remaining_rows_in_last_fragment =
416  max_fragment_size - (start_row_index % max_fragment_size);
417  }
418  if (buffer_row_count <= remaining_rows_in_last_fragment) {
419  partitions.emplace_back(buffer_row_count);
420  } else {
421  if (remaining_rows_in_last_fragment > 0) {
422  partitions.emplace_back(remaining_rows_in_last_fragment);
423  }
424  size_t remaining_buffer_row_count =
425  buffer_row_count - remaining_rows_in_last_fragment;
426  while (remaining_buffer_row_count > 0) {
427  partitions.emplace_back(
428  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
429  remaining_buffer_row_count -= partitions.back();
430  }
431  }
432  return partitions;
433 }
434 
441  std::queue<csv_file_buffer_parser::ParseBufferRequest> pending_requests;
443  std::condition_variable pending_requests_condition;
444  std::queue<csv_file_buffer_parser::ParseBufferRequest> request_pool;
445  std::mutex request_pool_mutex;
446  std::condition_variable request_pool_condition;
448  std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer>> chunk_encoder_buffers;
450  std::map<ChunkKey, size_t> chunk_byte_count;
452 };
453 
458 std::optional<csv_file_buffer_parser::ParseBufferRequest> get_next_metadata_scan_request(
459  MetadataScanMultiThreadingParams& multi_threading_params) {
460  std::unique_lock<std::mutex> pending_requests_lock(
461  multi_threading_params.pending_requests_mutex);
462  multi_threading_params.pending_requests_condition.wait(
463  pending_requests_lock, [&multi_threading_params] {
464  return !multi_threading_params.pending_requests.empty() ||
465  !multi_threading_params.continue_processing;
466  });
467  if (multi_threading_params.pending_requests.empty()) {
468  return {};
469  }
470  auto request = std::move(multi_threading_params.pending_requests.front());
471  multi_threading_params.pending_requests.pop();
472  pending_requests_lock.unlock();
473  multi_threading_params.pending_requests_condition.notify_all();
474  return std::move(request);
475 }
476 
481 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
482  std::mutex& file_region_mutex,
483  int fragment_id,
484  size_t first_row_index,
486  const std::string& file_path) {
487  std::lock_guard<std::mutex> lock(file_region_mutex);
488  fragment_id_to_file_regions_map[fragment_id].emplace_back(
489  FileRegion(file_path,
490  result.row_offsets.front(),
491  first_row_index,
492  result.row_count,
493  result.row_offsets.back() - result.row_offsets.front()));
494 }
495 
501  SQLTypeInfo sql_type_info) {
502  CHECK(sql_type_info.is_varlen());
503  size_t byte_count = 0;
504  if (sql_type_info.is_string() || sql_type_info.is_geometry()) {
505  for (const auto& str : *data_block.stringsPtr) {
506  byte_count += str.length();
507  }
508  } else if (sql_type_info.is_array()) {
509  for (const auto& array : *data_block.arraysPtr) {
510  byte_count += array.length;
511  }
512  } else {
513  UNREACHABLE();
514  }
515  return byte_count;
516 }
517 
522 void update_stats(Encoder* encoder,
523  const SQLTypeInfo& column_type,
524  DataBlockPtr data_block,
525  const size_t row_count) {
526  if (column_type.is_array()) {
527  encoder->updateStats(data_block.arraysPtr, 0, row_count);
528  } else if (!column_type.is_varlen()) {
529  encoder->updateStats(data_block.numbersPtr, row_count);
530  } else {
531  encoder->updateStats(data_block.stringsPtr, 0, row_count);
532  }
533 }
534 
540  int fragment_id,
543  std::map<int, const ColumnDescriptor*>& column_by_id) {
544  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
545  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
546  const auto column = column_by_id[column_id];
547  size_t byte_count;
548  if (column->columnType.is_varlen_indeed()) {
549  chunk_key.emplace_back(1);
550  byte_count = get_var_length_data_block_size(data_block, column->columnType);
551  } else {
552  byte_count = column->columnType.get_size() * result.row_count;
553  }
554 
555  {
556  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_byte_count_mutex);
557  multi_threading_params.chunk_byte_count[chunk_key] += byte_count;
558  }
559 
560  {
561  std::lock_guard<std::mutex> lock(
562  multi_threading_params.chunk_encoder_buffers_mutex);
563  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
564  multi_threading_params.chunk_encoder_buffers.end()) {
565  multi_threading_params.chunk_encoder_buffers[chunk_key] =
566  std::make_unique<ForeignStorageBuffer>();
567  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
568  column->columnType);
569  }
570  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder(),
571  column->columnType,
572  data_block,
573  result.row_count);
574  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
575  ->getEncoder()
576  ->getNumElems() +
577  result.row_count;
578  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
579  num_elements);
580  }
581  }
582 }
583 
590  std::unique_lock<std::mutex> completed_requests_queue_lock(
591  multi_threading_params.request_pool_mutex);
592  multi_threading_params.request_pool.emplace(std::move(request));
593  completed_requests_queue_lock.unlock();
594  multi_threading_params.request_pool_condition.notify_all();
595 }
596 
601 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
602  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
603  std::mutex& file_region_mutex) {
604  std::map<int, const ColumnDescriptor*> column_by_id{};
605  while (true) {
606  auto request_opt = get_next_metadata_scan_request(multi_threading_params);
607  if (!request_opt.has_value()) {
608  break;
609  }
610  auto& request = request_opt.value();
611  if (column_by_id.empty()) {
612  for (const auto column : request.getColumns()) {
613  column_by_id[column->columnId] = column;
614  }
615  }
616  auto partitions = partition_by_fragment(
617  request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
618  request.begin_pos = 0;
619  size_t row_index = request.first_row_index;
620  for (const auto partition : partitions) {
621  request.process_row_count = partition;
622  for (const auto& import_buffer : request.import_buffers) {
623  import_buffer->clear();
624  }
625  auto result = parse_buffer(request);
626  int fragment_id = row_index / request.getMaxFragRows();
627  add_file_region(fragment_id_to_file_regions_map,
628  file_region_mutex,
629  fragment_id,
630  request.first_row_index,
631  result,
632  request.getFilePath());
633  update_metadata(multi_threading_params, fragment_id, request, result, column_by_id);
634  row_index += result.row_count;
635  request.begin_pos = result.row_offsets.back() - request.file_offset;
636  }
637  add_request_to_pool(multi_threading_params, request);
638  }
639 }
640 
645  MetadataScanMultiThreadingParams& multi_threading_params) {
646  std::unique_lock<std::mutex> request_pool_lock(
647  multi_threading_params.request_pool_mutex);
648  multi_threading_params.request_pool_condition.wait(
649  request_pool_lock,
650  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
651  auto request = std::move(multi_threading_params.request_pool.front());
652  multi_threading_params.request_pool.pop();
653  request_pool_lock.unlock();
654  CHECK(request.buffer);
655  return request;
656 }
657 
663  MetadataScanMultiThreadingParams& multi_threading_params,
665  {
666  std::unique_lock<std::mutex> pending_requests_lock(
667  multi_threading_params.pending_requests_mutex);
668  multi_threading_params.pending_requests.emplace(std::move(request));
669  }
670  multi_threading_params.pending_requests_condition.notify_all();
671 }
672 
677 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
678  size_t& buffer_size,
679  const size_t alloc_size) {
680  CHECK_LE(buffer_size, alloc_size);
681  if (buffer_size < alloc_size) {
682  buffer = std::make_unique<char[]>(alloc_size);
683  buffer_size = alloc_size;
684  }
685 }
686 
692  const size_t& buffer_size,
693  const std::string& file_path,
694  CsvReader& csv_reader,
695  const import_export::CopyParams& copy_params,
696  MetadataScanMultiThreadingParams& multi_threading_params,
697  size_t& first_row_index_in_buffer,
698  size_t& current_file_offset) {
699  auto alloc_size = buffer_size;
700  auto residual_buffer = std::make_unique<char[]>(alloc_size);
701  size_t residual_buffer_size = 0;
702  size_t residual_buffer_alloc_size = alloc_size;
703 
704  while (!csv_reader.isScanFinished()) {
705  auto request = get_request_from_pool(multi_threading_params);
706  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
707 
708  if (residual_buffer_size > 0) {
709  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
710  }
711  size_t size = residual_buffer_size;
712  size += csv_reader.read(request.buffer.get() + residual_buffer_size,
713  alloc_size - residual_buffer_size);
714 
715  if (size == 0) {
716  // In some cases at the end of a file we will read 0 bytes even when
717  // csv_reader.isScanFinished() is false
718  continue;
719  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
720  // In some cases files with newlines at the end will be encoded with a second
721  // newline that can end up being the only thing in the buffer
722  current_file_offset++;
723  continue;
724  }
725  unsigned int num_rows_in_buffer = 0;
726  request.end_pos =
728  request.buffer,
729  size,
730  copy_params,
731  first_row_index_in_buffer,
732  num_rows_in_buffer,
733  nullptr,
734  &csv_reader);
735  request.buffer_size = size;
736  request.buffer_alloc_size = alloc_size;
737  request.first_row_index = first_row_index_in_buffer;
738  request.file_offset = current_file_offset;
739  request.buffer_row_count = num_rows_in_buffer;
740 
741  residual_buffer_size = size - request.end_pos;
742  if (residual_buffer_size > 0) {
743  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
744  memcpy(residual_buffer.get(),
745  request.buffer.get() + request.end_pos,
746  residual_buffer_size);
747  }
748 
749  current_file_offset += request.end_pos;
750  first_row_index_in_buffer += num_rows_in_buffer;
751 
752  dispatch_metadata_scan_request(multi_threading_params, request);
753  }
754 
755  std::unique_lock<std::mutex> pending_requests_queue_lock(
756  multi_threading_params.pending_requests_mutex);
757  multi_threading_params.pending_requests_condition.wait(
758  pending_requests_queue_lock, [&multi_threading_params] {
759  return multi_threading_params.pending_requests.empty();
760  });
761  multi_threading_params.continue_processing = false;
762  pending_requests_queue_lock.unlock();
763  multi_threading_params.pending_requests_condition.notify_all();
764 }
765 
779  auto timer = DEBUG_TIMER(__func__);
780  chunk_metadata_map_.clear();
781 
782  const auto copy_params = validateAndGetCopyParams();
783  const auto file_path = foreign_table_->getFilePath();
785  auto& server_options = foreign_table_->foreign_server->options;
786  if (foreign_table_->isAppendMode() && csv_reader_ != nullptr) {
787  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
789  csv_reader_->checkForMoreRows(append_start_offset_);
790  } else {
791  UNREACHABLE();
792  }
793  } else {
795  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
797  csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
798  } else {
799  UNREACHABLE();
800  }
801  num_rows_ = 0;
803  }
804 
805  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
806  foreign_table_->tableId, false, false, true);
807  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
808  for (auto column : columns) {
809  column_by_id[column->columnId] = column;
810  }
811  MetadataScanMultiThreadingParams multi_threading_params;
812 
813  // Restore previous chunk data
814  if (foreign_table_->isAppendMode()) {
815  multi_threading_params.chunk_byte_count = chunk_byte_count_;
816  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
817  }
818 
819  if (!csv_reader_->isScanFinished()) {
820  auto buffer_size = get_buffer_size(copy_params,
821  csv_reader_->isRemainingSizeKnown(),
822  csv_reader_->getRemainingSize());
823  auto thread_count = get_thread_count(copy_params,
824  csv_reader_->isRemainingSizeKnown(),
825  csv_reader_->getRemainingSize(),
826  buffer_size);
827  multi_threading_params.continue_processing = true;
828 
829  std::vector<std::future<void>> futures{};
830  for (size_t i = 0; i < thread_count; i++) {
831  multi_threading_params.request_pool.emplace(
832  buffer_size, copy_params, db_id_, foreign_table_);
833 
834  futures.emplace_back(std::async(std::launch::async,
836  std::ref(multi_threading_params),
838  std::ref(file_regions_mutex_)));
839  }
840 
841  try {
843  file_path,
844  (*csv_reader_),
845  copy_params,
846  multi_threading_params,
847  num_rows_,
849  } catch (...) {
850  {
851  std::unique_lock<std::mutex> pending_requests_lock(
852  multi_threading_params.pending_requests_mutex);
853  multi_threading_params.continue_processing = false;
854  }
855  multi_threading_params.pending_requests_condition.notify_all();
856  throw;
857  }
858 
859  for (auto& future : futures) {
860  // get() instead of wait() because we need to propagate potential exceptions.
861  future.get();
862  }
863  }
864 
865  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
866  auto chunk_metadata =
867  buffer->getEncoder()->getMetadata(column_by_id[chunk_key[2]]->columnType);
868  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
869  chunk_metadata->numBytes = multi_threading_params.chunk_byte_count[chunk_key];
870  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
871  chunk_metadata_map_[chunk_key] = chunk_metadata;
872  }
873 
874  // Save chunk data
875  if (foreign_table_->isAppendMode()) {
876  chunk_byte_count_ = multi_threading_params.chunk_byte_count;
877  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
878  }
879 
880  for (auto& entry : fragment_id_to_file_regions_map_) {
881  std::sort(entry.second.begin(), entry.second.end());
882  }
883 }
884 
885 // Serialization functions for FileRegion
886 void set_value(rapidjson::Value& json_val,
887  const FileRegion& file_region,
888  rapidjson::Document::AllocatorType& allocator) {
889  json_val.SetObject();
891  json_val, file_region.first_row_file_offset, "first_row_file_offset", allocator);
893  json_val, file_region.first_row_index, "first_row_index", allocator);
895  json_val, file_region.region_size, "region_size", allocator);
897  json_val, file_region.row_count, "row_count", allocator);
898 }
899 
900 void get_value(const rapidjson::Value& json_val, FileRegion& file_region) {
901  CHECK(json_val.IsObject());
903  json_val, file_region.first_row_file_offset, "first_row_file_offset");
905  json_val, file_region.first_row_index, "first_row_index");
906  json_utils::get_value_from_object(json_val, file_region.region_size, "region_size");
907  json_utils::get_value_from_object(json_val, file_region.row_count, "row_count");
908 }
909 
910 void CsvDataWrapper::serializeDataWrapperInternals(const std::string& file_path) const {
911  rapidjson::Document d;
912  d.SetObject();
913 
914  // Save fragment map
917  "fragment_id_to_file_regions_map",
918  d.GetAllocator());
919 
920  // Save csv_reader metadata
921  rapidjson::Value reader_metadata(rapidjson::kObjectType);
922  csv_reader_->serialize(reader_metadata, d.GetAllocator());
923  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
924 
925  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
927  d, append_start_offset_, "append_start_offset", d.GetAllocator());
928 
929  json_utils::write_to_file(d, file_path);
930 }
931 
933  const std::string& file_path,
934  const ChunkMetadataVector& chunk_metadata) {
935  auto d = json_utils::read_from_file(file_path);
936  CHECK(d.IsObject());
937 
938  // Restore fragment map
940  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
941 
942  // Construct csv_reader with metadta
943  CHECK(d.HasMember("reader_metadata"));
944  const auto copy_params = validateAndGetCopyParams();
945  const auto csv_file_path = foreign_table_->getFilePath();
946  auto& server_options = foreign_table_->foreign_server->options;
947  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
949  csv_reader_ = std::make_unique<LocalMultiFileReader>(
950  csv_file_path, copy_params, d["reader_metadata"]);
951  } else {
952  UNREACHABLE();
953  }
954 
956  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
957 
958  // Now restore the internal metadata maps
959  CHECK(chunk_metadata_map_.empty());
960  CHECK(chunk_encoder_buffers_.empty());
961 
962  for (auto& pair : chunk_metadata) {
963  chunk_metadata_map_[pair.first] = pair.second;
964 
965  if (foreign_table_->isAppendMode()) {
966  // Restore encoder state for append mode
967  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
968  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
969  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
970  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
971  pair.second->numElements);
972  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
973  pair.second->chunkStats);
974  chunk_encoder_buffers_[pair.first]->setUpdated();
975  chunk_byte_count_[pair.first] = pair.second->numBytes;
976  }
977  }
978  is_restored_ = true;
979 }
980 
982  return is_restored_;
983 }
984 
985 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool isRestored() const override
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
size_t get_var_length_data_block_size(DataBlockPtr data_block, SQLTypeInfo sql_type_info)
void populateChunkBuffers(std::map< ChunkKey, AbstractBuffer * > &required_buffers, std::map< ChunkKey, AbstractBuffer * > &optional_buffers) override
void serializeDataWrapperInternals(const std::string &file_path) const override
std::map< ChunkKey, size_t > chunk_byte_count_
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
static std::vector< std::string_view > getSupportedOptions()
static void validateOptions(const ForeignTable *foreign_table)
bool is_varlen() const
Definition: sqltypes.h:506
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
virtual size_t read(void *buffer, size_t max_size)=0
ParseBufferResult parse_buffer(ParseBufferRequest &request)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, std::mutex &file_access_mutex, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
#define UNREACHABLE()
Definition: Logger.h:241
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, std::mutex &file_region_mutex, int fragment_id, size_t first_row_index, const csv_file_buffer_parser::ParseBufferResult &result, const std::string &file_path)
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
std::unique_ptr< CsvReader > csv_reader_
static constexpr std::array< char const *, 11 > supported_options_
std::string to_string(char const *&&v)
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:108
int32_t StringOffsetT
Definition: sqltypes.h:947
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
std::vector< FileRegion > FileRegions
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
std::string validateAndGetStringWithLength(const std::string &option_name, const size_t expected_num_chars)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
rapidjson::Document read_from_file(const std::string &file_path)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
std::map< int, FileRegions > fragment_id_to_file_regions_map_
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3846
std::queue< csv_file_buffer_parser::ParseBufferRequest > request_pool
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:613
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const std::map< ChunkKey, AbstractBuffer * > &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
std::string getFilePath() const
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
CsvDataWrapper(const int db_id, const ForeignTable *foreign_table)
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::optional< bool > validateAndGetBoolValue(const std::string &option_name)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
csv_file_buffer_parser::ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
int32_t ArrayOffsetT
Definition: sqltypes.h:948
std::queue< csv_file_buffer_parser::ParseBufferRequest > pending_requests
virtual bool isScanFinished()=0
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:63
bool g_enable_watchdog false
Definition: Execute.cpp:73
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:499
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::optional< csv_file_buffer_parser::ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
bool is_string() const
Definition: sqltypes.h:487
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, std::mutex &file_region_mutex)
int8_t * numbersPtr
Definition: sqltypes.h:220
std::set< const ColumnDescriptor * > get_columns(const std::map< ChunkKey, AbstractBuffer * > &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)
void update_metadata(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const csv_file_buffer_parser::ParseBufferRequest &request, const csv_file_buffer_parser::ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id)
virtual void updateStats(const int64_t val, const bool is_null)=0
virtual void reserve(size_t num_bytes)=0
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
bool is_array() const
Definition: sqltypes.h:495
import_export::CopyParams validateAndGetCopyParams()
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset)
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31