OmniSciDB  8a228a1076
CsvDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "CsvDataWrapper.h"
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 #include <regex>
23 
24 #include <boost/filesystem.hpp>
25 
29 #include "ImportExport/Importer.h"
30 #include "Utils/DdlUtils.h"
31 
32 namespace foreign_storage {
33 CsvDataWrapper::CsvDataWrapper(const int db_id, const ForeignTable* foreign_table)
34  : db_id_(db_id), foreign_table_(foreign_table) {}
35 
37  : db_id_(-1), foreign_table_(foreign_table) {}
38 
39 void CsvDataWrapper::validateOptions(const ForeignTable* foreign_table) {
40  for (const auto& entry : foreign_table->options) {
41  const auto& table_options = foreign_table->supported_options;
42  if (std::find(table_options.begin(), table_options.end(), entry.first) ==
43  table_options.end() &&
44  std::find(supported_options_.begin(), supported_options_.end(), entry.first) ==
45  supported_options_.end()) {
46  throw std::runtime_error{"Invalid foreign table option \"" + entry.first + "\"."};
47  }
48  }
49  CsvDataWrapper data_wrapper{foreign_table};
50  data_wrapper.validateAndGetCopyParams();
51  data_wrapper.validateFilePath();
52 }
53 
55  auto& server_options = foreign_table_->foreign_server->options;
56  auto file_path_entry = foreign_table_->options.find("FILE_PATH");
57  std::string file_path{};
58  if (file_path_entry != foreign_table_->options.end()) {
59  file_path = file_path_entry->second;
60  }
61  std::string base_path{};
62  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
64  auto base_path_entry = server_options.find(ForeignServer::BASE_PATH_KEY);
65  if (base_path_entry == server_options.end()) {
66  throw std::runtime_error{"No base path found in foreign server options."};
67  }
68  base_path = base_path_entry->second;
69  const std::string separator{boost::filesystem::path::preferred_separator};
70  return std::regex_replace(
71  base_path + separator + file_path, std::regex{separator + "{2,}"}, separator);
72  } else {
73  // Just return the file path as a prefix
74  return file_path;
75  }
76 }
77 
79  auto& server_options = foreign_table_->foreign_server->options;
80  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
84  }
85 }
86 
88  import_export::CopyParams copy_params{};
89  copy_params.plain_text = true;
90  if (const auto& value = validateAndGetStringWithLength("ARRAY_DELIMITER", 1);
91  !value.empty()) {
92  copy_params.array_delim = value[0];
93  }
94  if (const auto& value = validateAndGetStringWithLength("ARRAY_MARKER", 2);
95  !value.empty()) {
96  copy_params.array_begin = value[0];
97  copy_params.array_end = value[1];
98  }
99  if (auto it = foreign_table_->options.find("BUFFER_SIZE");
100  it != foreign_table_->options.end()) {
101  copy_params.buffer_size = std::stoi(it->second);
102  }
103  if (const auto& value = validateAndGetStringWithLength("DELIMITER", 1);
104  !value.empty()) {
105  copy_params.delimiter = value[0];
106  }
107  if (const auto& value = validateAndGetStringWithLength("ESCAPE", 1); !value.empty()) {
108  copy_params.escape = value[0];
109  }
110  auto has_header = validateAndGetBoolValue("HEADER");
111  if (has_header.has_value()) {
112  if (has_header.value()) {
113  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
114  } else {
115  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
116  }
117  }
118  if (const auto& value = validateAndGetStringWithLength("LINE_DELIMITER", 1);
119  !value.empty()) {
120  copy_params.line_delim = value[0];
121  }
122  copy_params.lonlat = validateAndGetBoolValue("LONLAT").value_or(copy_params.lonlat);
123 
124  if (auto it = foreign_table_->options.find("NULLS");
125  it != foreign_table_->options.end()) {
126  copy_params.null_str = it->second;
127  }
128  if (const auto& value = validateAndGetStringWithLength("QUOTE", 1); !value.empty()) {
129  copy_params.quote = value[0];
130  }
131  copy_params.quoted = validateAndGetBoolValue("QUOTED").value_or(copy_params.quoted);
132  return copy_params;
133 }
134 
136  const std::string& option_name,
137  const size_t expected_num_chars) {
138  if (auto it = foreign_table_->options.find(option_name);
139  it != foreign_table_->options.end()) {
140  if (it->second.length() != expected_num_chars) {
141  throw std::runtime_error{"Value of \"" + option_name +
142  "\" foreign table option has the wrong number of "
143  "characters. Expected " +
144  std::to_string(expected_num_chars) + " character(s)."};
145  }
146  return it->second;
147  }
148  return "";
149 }
150 
152  const std::string& option_name) {
153  if (auto it = foreign_table_->options.find(option_name);
154  it != foreign_table_->options.end()) {
155  if (boost::iequals(it->second, "TRUE")) {
156  return true;
157  } else if (boost::iequals(it->second, "FALSE")) {
158  return false;
159  } else {
160  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
161  "\" foreign table option. "
162  "Value must be either 'true' or 'false'."};
163  }
164  }
165  return std::nullopt;
166 }
167 
168 namespace {
169 std::set<const ColumnDescriptor*> get_columns(
170  const std::map<ChunkKey, AbstractBuffer*>& buffers,
171  std::shared_ptr<Catalog_Namespace::Catalog> catalog,
172  const int32_t table_id,
173  const int fragment_id) {
174  CHECK(!buffers.empty());
175  std::set<const ColumnDescriptor*> columns;
176  for (const auto& entry : buffers) {
177  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
178  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
179  const auto column = catalog->getMetadataForColumnUnlocked(table_id, column_id);
180  columns.emplace(column);
181  }
182  return columns;
183 }
184 } // namespace
185 
187  const std::set<const ColumnDescriptor*>& columns,
188  const int fragment_id,
189  const std::map<ChunkKey, AbstractBuffer*>& buffers,
190  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
191  for (const auto column : columns) {
192  ChunkKey data_chunk_key;
193  AbstractBuffer* data_buffer = nullptr;
194  AbstractBuffer* index_buffer = nullptr;
195  if (column->columnType.is_varlen_indeed()) {
196  data_chunk_key = {
197  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 1};
198  ChunkKey index_chunk_key = {
199  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 2};
200 
201  CHECK(buffers.find(data_chunk_key) != buffers.end());
202  CHECK(buffers.find(index_chunk_key) != buffers.end());
203 
204  data_buffer = buffers.find(data_chunk_key)->second;
205  index_buffer = buffers.find(index_chunk_key)->second;
206 
207  CHECK_EQ(data_buffer->size(), static_cast<size_t>(0));
208  CHECK_EQ(index_buffer->size(), static_cast<size_t>(0));
209 
210  size_t index_offset_size{0};
211  if (column->columnType.is_string() || column->columnType.is_geometry()) {
212  index_offset_size = sizeof(StringOffsetT);
213  } else if (column->columnType.is_array()) {
214  index_offset_size = sizeof(ArrayOffsetT);
215  } else {
216  UNREACHABLE();
217  }
218  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
219  index_buffer->reserve(index_offset_size *
220  (chunk_metadata_map_[data_chunk_key]->numElements + 1));
221  } else {
222  data_chunk_key = {db_id_, foreign_table_->tableId, column->columnId, fragment_id};
223  CHECK(buffers.find(data_chunk_key) != buffers.end());
224  data_buffer = buffers.find(data_chunk_key)->second;
225  }
226  data_buffer->reserve(chunk_metadata_map_[data_chunk_key]->numBytes);
227  column_id_to_chunk_map[column->columnId] = Chunk_NS::Chunk{column};
228  column_id_to_chunk_map[column->columnId].setBuffer(data_buffer);
229  column_id_to_chunk_map[column->columnId].setIndexBuffer(index_buffer);
230  column_id_to_chunk_map[column->columnId].initEncoder();
231  }
232 }
233 
235  std::map<ChunkKey, AbstractBuffer*>& required_buffers,
236  std::map<ChunkKey, AbstractBuffer*>& optional_buffers) {
237  auto timer = DEBUG_TIMER(__func__);
238  auto catalog = Catalog_Namespace::Catalog::get(db_id_);
239  CHECK(catalog);
240  CHECK(!required_buffers.empty());
241 
242  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
243  std::set<const ColumnDescriptor*> required_columns =
244  get_columns(required_buffers, catalog, foreign_table_->tableId, fragment_id);
245  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
247  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
248 
249  if (!optional_buffers.empty()) {
250  std::set<const ColumnDescriptor*> optional_columns;
251  optional_columns =
252  get_columns(optional_buffers, catalog, foreign_table_->tableId, fragment_id);
254  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
255  }
256  populateChunks(column_id_to_chunk_map, fragment_id);
257 
258  for (auto& entry : column_id_to_chunk_map) {
259  entry.second.setBuffer(nullptr);
260  entry.second.setIndexBuffer(nullptr);
261  }
262 }
263 
268  size_t file_offset;
269  size_t row_count;
270  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
271 
272  bool operator<(const ParseFileRegionResult& other) const {
273  return file_offset < other.file_offset;
274  }
275 };
276 
282  const FileRegions& file_regions,
283  const size_t start_index,
284  const size_t end_index,
285  CsvReader& csv_reader,
286  std::mutex& file_access_mutex,
287  csv_file_buffer_parser::ParseBufferRequest& parse_file_request,
288  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
289  ParseFileRegionResult load_file_region_result{};
290  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
291  load_file_region_result.row_count = 0;
292 
294  for (size_t i = start_index; i <= end_index; i++) {
295  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
296  size_t read_size;
297  {
298  std::lock_guard<std::mutex> lock(file_access_mutex);
299  read_size = csv_reader.readRegion(parse_file_request.buffer.get(),
300  file_regions[i].first_row_file_offset,
301  file_regions[i].region_size);
302  }
303 
304  CHECK_EQ(file_regions[i].region_size, read_size);
305  parse_file_request.begin_pos = 0;
306  parse_file_request.end_pos = file_regions[i].region_size;
307  parse_file_request.first_row_index = file_regions[i].first_row_index;
308  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
309  parse_file_request.process_row_count = file_regions[i].row_count;
310 
311  result = parse_buffer(parse_file_request);
312  CHECK_EQ(file_regions[i].row_count, result.row_count);
313  load_file_region_result.row_count += result.row_count;
314  }
315  load_file_region_result.column_id_to_data_blocks_map =
317  return load_file_region_result;
318 }
319 
323 size_t get_buffer_size(const import_export::CopyParams& copy_params,
324  const bool size_known,
325  const size_t file_size) {
326  size_t buffer_size = copy_params.buffer_size;
327  if (size_known && file_size < buffer_size) {
328  buffer_size = file_size + 1; // +1 for end of line character, if missing
329  }
330  return buffer_size;
331 }
332 
333 size_t get_buffer_size(const FileRegions& file_regions) {
334  size_t buffer_size = 0;
335  for (const auto& file_region : file_regions) {
336  buffer_size = std::max(buffer_size, file_region.region_size);
337  }
338  CHECK(buffer_size);
339  return buffer_size;
340 }
341 
346 size_t get_thread_count(const import_export::CopyParams& copy_params,
347  const bool size_known,
348  const size_t file_size,
349  const size_t buffer_size) {
350  size_t thread_count = copy_params.threads;
351  if (thread_count == 0) {
352  thread_count = std::thread::hardware_concurrency();
353  }
354  if (size_known) {
355  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
356  if (num_buffers_in_file < thread_count) {
357  thread_count = num_buffers_in_file;
358  }
359  }
360  CHECK(thread_count);
361  return thread_count;
362 }
363 
364 size_t get_thread_count(const import_export::CopyParams& copy_params,
365  const FileRegions& file_regions) {
366  size_t thread_count = copy_params.threads;
367  if (thread_count == 0) {
368  thread_count =
369  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
370  }
371  CHECK(thread_count);
372  return thread_count;
373 }
374 
379  const std::list<const ColumnDescriptor*>& columns,
380  std::shared_ptr<Catalog_Namespace::Catalog> catalog,
381  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers) {
382  for (const auto column : columns) {
383  StringDictionary* string_dictionary = nullptr;
384  if (column->columnType.is_dict_encoded_string() ||
385  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
386  column->columnType.get_compression() == kENCODING_DICT)) {
387  auto dict_descriptor =
388  catalog->getMetadataForDictUnlocked(column->columnType.get_comp_param(), true);
389  string_dictionary = dict_descriptor->stringDict.get();
390  }
391  import_buffers.emplace_back(
392  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
393  }
394 }
395 
397  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
398  int fragment_id) {
399  const auto copy_params = validateAndGetCopyParams();
400 
401  CHECK(!column_id_to_chunk_map.empty());
402  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
403  CHECK(!file_regions.empty());
404 
405  const auto buffer_size = get_buffer_size(file_regions);
406  const auto thread_count = get_thread_count(copy_params, file_regions);
407 
408  auto catalog = Catalog_Namespace::Catalog::get(db_id_);
409  CHECK(catalog);
410  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
411  foreign_table_->tableId, false, false, true);
412  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
413 
414  std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
415  parse_file_requests.reserve(thread_count);
416  std::vector<std::future<ParseFileRegionResult>> futures{};
417 
418  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
419  parse_file_requests.emplace_back();
420  csv_file_buffer_parser::ParseBufferRequest& parse_file_request =
421  parse_file_requests.back();
422  parse_file_request.buffer = std::make_unique<char[]>(buffer_size);
423  parse_file_request.buffer_size = buffer_size;
424  parse_file_request.buffer_alloc_size = buffer_size;
425  parse_file_request.copy_params = copy_params;
426  parse_file_request.columns = columns;
427  parse_file_request.catalog = catalog;
428  initialize_import_buffers(columns, catalog, parse_file_request.import_buffers);
429 
430  auto start_index = i;
431  auto end_index =
432  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
433  futures.emplace_back(std::async(std::launch::async,
435  std::ref(file_regions),
436  start_index,
437  end_index,
438  std::ref((*csv_reader_)),
439  std::ref(file_access_mutex_),
440  std::ref(parse_file_request),
441  std::ref(column_id_to_chunk_map)));
442  }
443 
444  std::set<ParseFileRegionResult> load_file_region_results{};
445  for (auto& future : futures) {
446  future.wait();
447  load_file_region_results.emplace(future.get());
448  }
449 
450  for (auto result : load_file_region_results) {
451  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
452  chunk.appendData(
453  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
454  }
455  }
456 }
457 
464 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
465  const size_t max_fragment_size,
466  const size_t buffer_row_count) {
467  CHECK(buffer_row_count > 0);
468  std::vector<size_t> partitions{};
469  size_t remaining_rows_in_last_fragment;
470  if (start_row_index % max_fragment_size == 0) {
471  remaining_rows_in_last_fragment = 0;
472  } else {
473  remaining_rows_in_last_fragment =
474  max_fragment_size - (start_row_index % max_fragment_size);
475  }
476  if (buffer_row_count <= remaining_rows_in_last_fragment) {
477  partitions.emplace_back(buffer_row_count);
478  } else {
479  if (remaining_rows_in_last_fragment > 0) {
480  partitions.emplace_back(remaining_rows_in_last_fragment);
481  }
482  size_t remaining_buffer_row_count =
483  buffer_row_count - remaining_rows_in_last_fragment;
484  while (remaining_buffer_row_count > 0) {
485  partitions.emplace_back(
486  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
487  remaining_buffer_row_count -= partitions.back();
488  }
489  }
490  return partitions;
491 }
492 
499  std::queue<csv_file_buffer_parser::ParseBufferRequest> pending_requests;
501  std::condition_variable pending_requests_condition;
502  std::queue<csv_file_buffer_parser::ParseBufferRequest> request_pool;
503  std::mutex request_pool_mutex;
504  std::condition_variable request_pool_condition;
506  std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer>> chunk_encoder_buffers;
508  std::map<ChunkKey, size_t> chunk_byte_count;
510 };
511 
516 std::optional<csv_file_buffer_parser::ParseBufferRequest> get_next_metadata_scan_request(
517  MetadataScanMultiThreadingParams& multi_threading_params) {
518  std::unique_lock<std::mutex> pending_requests_lock(
519  multi_threading_params.pending_requests_mutex);
520  multi_threading_params.pending_requests_condition.wait(
521  pending_requests_lock, [&multi_threading_params] {
522  return !multi_threading_params.pending_requests.empty() ||
523  !multi_threading_params.continue_processing;
524  });
525  if (multi_threading_params.pending_requests.empty()) {
526  return {};
527  }
528  auto request = std::move(multi_threading_params.pending_requests.front());
529  multi_threading_params.pending_requests.pop();
530  pending_requests_lock.unlock();
531  multi_threading_params.pending_requests_condition.notify_all();
532  return std::move(request);
533 }
534 
539 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
540  std::mutex& file_region_mutex,
541  int fragment_id,
542  size_t first_row_index,
544  FileRegion file_region;
545  file_region.first_row_file_offset = result.row_offsets.front();
546  file_region.region_size = result.row_offsets.back() - file_region.first_row_file_offset;
547  file_region.first_row_index = first_row_index;
548  file_region.row_count = result.row_count;
549 
550  {
551  std::lock_guard<std::mutex> lock(file_region_mutex);
552  fragment_id_to_file_regions_map[fragment_id].emplace_back(file_region);
553  }
554 }
555 
561  SQLTypeInfo sql_type_info) {
562  CHECK(sql_type_info.is_varlen());
563  size_t byte_count = 0;
564  if (sql_type_info.is_string() || sql_type_info.is_geometry()) {
565  for (const auto& str : *data_block.stringsPtr) {
566  byte_count += str.length();
567  }
568  } else if (sql_type_info.is_array()) {
569  for (const auto& array : *data_block.arraysPtr) {
570  byte_count += array.length;
571  }
572  } else {
573  UNREACHABLE();
574  }
575  return byte_count;
576 }
577 
582 void update_stats(Encoder* encoder,
583  const SQLTypeInfo& column_type,
584  DataBlockPtr data_block,
585  const size_t row_count) {
586  if (column_type.is_array()) {
587  encoder->updateStats(data_block.arraysPtr, 0, row_count);
588  } else if (!column_type.is_varlen()) {
589  encoder->updateStats(data_block.numbersPtr, row_count);
590  } else {
591  encoder->updateStats(data_block.stringsPtr, 0, row_count);
592  }
593 }
594 
600  int fragment_id,
603  std::map<int, const ColumnDescriptor*>& column_by_id) {
604  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
605  ChunkKey chunk_key{request.db_id, request.table_id, column_id, fragment_id};
606  const auto column = column_by_id[column_id];
607  size_t byte_count;
608  if (column->columnType.is_varlen_indeed()) {
609  chunk_key.emplace_back(1);
610  byte_count = get_var_length_data_block_size(data_block, column->columnType);
611  } else {
612  byte_count = column->columnType.get_size() * result.row_count;
613  }
614 
615  {
616  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_byte_count_mutex);
617  multi_threading_params.chunk_byte_count[chunk_key] += byte_count;
618  }
619 
620  {
621  std::lock_guard<std::mutex> lock(
622  multi_threading_params.chunk_encoder_buffers_mutex);
623  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
624  multi_threading_params.chunk_encoder_buffers.end()) {
625  multi_threading_params.chunk_encoder_buffers[chunk_key] =
626  std::make_unique<ForeignStorageBuffer>();
627  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
628  column->columnType);
629  }
630  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->encoder.get(),
631  column->columnType,
632  data_block,
633  result.row_count);
634  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
635  ->encoder->getNumElems() +
636  result.row_count;
637  multi_threading_params.chunk_encoder_buffers[chunk_key]->encoder->setNumElems(
638  num_elements);
639  }
640  }
641 }
642 
649  std::unique_lock<std::mutex> completed_requests_queue_lock(
650  multi_threading_params.request_pool_mutex);
651  multi_threading_params.request_pool.emplace(std::move(request));
652  completed_requests_queue_lock.unlock();
653  multi_threading_params.request_pool_condition.notify_all();
654 }
655 
660 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
661  std::map<int, FileRegions>& fragment_id_to_file_regions_map,
662  std::mutex& file_region_mutex) {
663  std::map<int, const ColumnDescriptor*> column_by_id{};
664  while (true) {
665  auto request_opt = get_next_metadata_scan_request(multi_threading_params);
666  if (!request_opt.has_value()) {
667  break;
668  }
669  auto& request = request_opt.value();
670  if (column_by_id.empty()) {
671  for (const auto column : request.columns) {
672  column_by_id[column->columnId] = column;
673  }
674  }
675  auto partitions = partition_by_fragment(
676  request.first_row_index, request.max_fragment_rows, request.buffer_row_count);
677  request.begin_pos = 0;
678  size_t row_index = request.first_row_index;
679  for (const auto partition : partitions) {
680  request.process_row_count = partition;
681  for (const auto& import_buffer : request.import_buffers) {
682  import_buffer->clear();
683  }
684  auto result = parse_buffer(request);
685  int fragment_id = row_index / request.max_fragment_rows;
686  add_file_region(fragment_id_to_file_regions_map,
687  file_region_mutex,
688  fragment_id,
689  request.first_row_index,
690  result);
691  update_metadata(multi_threading_params, fragment_id, request, result, column_by_id);
692  row_index += result.row_count;
693  request.begin_pos = result.row_offsets.back() - request.file_offset;
694  }
695  add_request_to_pool(multi_threading_params, request);
696  }
697 }
698 
703  MetadataScanMultiThreadingParams& multi_threading_params) {
704  std::unique_lock<std::mutex> request_pool_lock(
705  multi_threading_params.request_pool_mutex);
706  multi_threading_params.request_pool_condition.wait(
707  request_pool_lock,
708  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
709  auto request = std::move(multi_threading_params.request_pool.front());
710  multi_threading_params.request_pool.pop();
711  request_pool_lock.unlock();
712  CHECK(request.buffer);
713  return request;
714 }
715 
721  MetadataScanMultiThreadingParams& multi_threading_params,
723  std::unique_lock<std::mutex> pending_requests_lock(
724  multi_threading_params.pending_requests_mutex);
725  multi_threading_params.pending_requests.emplace(std::move(request));
726  pending_requests_lock.unlock();
727  multi_threading_params.pending_requests_condition.notify_all();
728 }
729 
734 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
735  size_t& buffer_size,
736  const size_t alloc_size) {
737  CHECK_LE(buffer_size, alloc_size);
738  if (buffer_size < alloc_size) {
739  buffer = std::make_unique<char[]>(alloc_size);
740  buffer_size = alloc_size;
741  }
742 }
743 
749  const size_t& buffer_size,
750  const std::string& file_path,
751  CsvReader& csv_reader,
752  const import_export::CopyParams& copy_params,
753  MetadataScanMultiThreadingParams& multi_threading_params,
754  size_t& first_row_index_in_buffer,
755  size_t& current_file_offset) {
756  auto alloc_size = buffer_size;
757  auto residual_buffer = std::make_unique<char[]>(alloc_size);
758  size_t residual_buffer_size = 0;
759  size_t residual_buffer_alloc_size = alloc_size;
760 
761  while (!csv_reader.isScanFinished()) {
762  auto request = get_request_from_pool(multi_threading_params);
763  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
764 
765  if (residual_buffer_size > 0) {
766  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
767  }
768  size_t size = residual_buffer_size;
769  size += csv_reader.read(request.buffer.get() + residual_buffer_size,
770  alloc_size - residual_buffer_size);
771 
772  if (size == 0) {
773  // In some cases at the end of a file we will read 0 bytes even when
774  // csv_reader.isScanFinished() is false
775  continue;
776  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
777  // In some cases files with newlines at the end will be encoded with a second
778  // newline that can end up being the only thing in the buffer
779  current_file_offset++;
780  continue;
781  }
782  unsigned int num_rows_in_buffer = 0;
783  request.end_pos =
785  request.buffer,
786  size,
787  copy_params,
788  first_row_index_in_buffer,
789  num_rows_in_buffer,
790  nullptr,
791  &csv_reader);
792  request.buffer_size = size;
793  request.buffer_alloc_size = alloc_size;
794  request.first_row_index = first_row_index_in_buffer;
795  request.file_offset = current_file_offset;
796  request.buffer_row_count = num_rows_in_buffer;
797 
798  residual_buffer_size = size - request.end_pos;
799  if (residual_buffer_size > 0) {
800  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
801  memcpy(residual_buffer.get(),
802  request.buffer.get() + request.end_pos,
803  residual_buffer_size);
804  }
805 
806  current_file_offset += request.end_pos;
807  first_row_index_in_buffer += num_rows_in_buffer;
808 
809  dispatch_metadata_scan_request(multi_threading_params, request);
810  }
811 
812  std::unique_lock<std::mutex> pending_requests_queue_lock(
813  multi_threading_params.pending_requests_mutex);
814  multi_threading_params.pending_requests_condition.wait(
815  pending_requests_queue_lock, [&multi_threading_params] {
816  return multi_threading_params.pending_requests.empty();
817  });
818  multi_threading_params.continue_processing = false;
819  pending_requests_queue_lock.unlock();
820  multi_threading_params.pending_requests_condition.notify_all();
821 }
822 
836  auto timer = DEBUG_TIMER(__func__);
837  chunk_metadata_map_.clear();
838 
839  auto update_mode_it = foreign_table_->options.find("UPDATE_MODE");
840  bool append_mode = (update_mode_it != foreign_table_->options.end()) &&
841  boost::iequals(update_mode_it->second, "APPEND");
842 
843  const auto copy_params = validateAndGetCopyParams();
844  const auto file_path = getFilePath();
845 
846  if (append_mode && csv_reader_ != nullptr) {
847  csv_reader_->checkForMoreRows(append_start_offset_);
848  } else {
850  auto& server_options = foreign_table_->foreign_server->options;
851  if (server_options.find(ForeignServer::STORAGE_TYPE_KEY)->second ==
853  csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
854  } else {
855  UNREACHABLE();
856  }
857  num_rows_ = 0;
859  }
860 
861  auto catalog = Catalog_Namespace::Catalog::get(db_id_);
862  CHECK(catalog);
863  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
864  foreign_table_->tableId, false, false, true);
865  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
866  for (auto column : columns) {
867  column_by_id[column->columnId] = column;
868  }
869  MetadataScanMultiThreadingParams multi_threading_params;
870 
871  // Restore previous chunk data
872  if (append_mode) {
873  multi_threading_params.chunk_byte_count = chunk_byte_count_;
874  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
875  }
876 
877  if (!csv_reader_->isScanFinished()) {
878  auto buffer_size = get_buffer_size(copy_params,
879  csv_reader_->isRemainingSizeKnown(),
880  csv_reader_->getRemainingSize());
881  auto thread_count = get_thread_count(copy_params,
882  csv_reader_->isRemainingSizeKnown(),
883  csv_reader_->getRemainingSize(),
884  buffer_size);
885  multi_threading_params.continue_processing = true;
886 
887  std::vector<std::future<void>> futures{};
888  for (size_t i = 0; i < thread_count; i++) {
889  futures.emplace_back(std::async(std::launch::async,
891  std::ref(multi_threading_params),
893  std::ref(file_regions_mutex_)));
894 
895  multi_threading_params.request_pool.emplace();
896  csv_file_buffer_parser::ParseBufferRequest& parse_buffer_request =
897  multi_threading_params.request_pool.back();
898  initialize_import_buffers(columns, catalog, parse_buffer_request.import_buffers);
899  parse_buffer_request.copy_params = copy_params;
900  parse_buffer_request.columns = columns;
901  parse_buffer_request.catalog = catalog;
902  parse_buffer_request.db_id = db_id_;
903  parse_buffer_request.table_id = foreign_table_->tableId;
904  parse_buffer_request.max_fragment_rows = foreign_table_->maxFragRows;
905  parse_buffer_request.buffer = std::make_unique<char[]>(buffer_size);
906  parse_buffer_request.buffer_size = buffer_size;
907  parse_buffer_request.buffer_alloc_size = buffer_size;
908  }
909 
910  try {
912  file_path,
913  (*csv_reader_),
914  copy_params,
915  multi_threading_params,
916  num_rows_,
918  } catch (...) {
919  {
920  std::unique_lock<std::mutex> pending_requests_lock(
921  multi_threading_params.pending_requests_mutex);
922  multi_threading_params.continue_processing = false;
923  }
924  multi_threading_params.pending_requests_condition.notify_all();
925  throw;
926  }
927 
928  for (auto& future : futures) {
929  // get() instead of wait() because we need to propagate potential exceptions.
930  future.get();
931  }
932  }
933 
934  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
935  auto chunk_metadata =
936  buffer->encoder->getMetadata(column_by_id[chunk_key[2]]->columnType);
937  chunk_metadata->numElements = buffer->encoder->getNumElems();
938  chunk_metadata->numBytes = multi_threading_params.chunk_byte_count[chunk_key];
939  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
940  chunk_metadata_map_[chunk_key] = chunk_metadata;
941  }
942 
943  // Save chunk data
944  if (append_mode) {
945  chunk_byte_count_ = multi_threading_params.chunk_byte_count;
946  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
947  }
948 
949  for (auto& entry : fragment_id_to_file_regions_map_) {
950  std::sort(entry.second.begin(), entry.second.end());
951  }
952 }
953 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t get_var_length_data_block_size(DataBlockPtr data_block, SQLTypeInfo sql_type_info)
std::vector< std::unique_ptr< import_export::TypedImportBuffer > > import_buffers
std::map< ChunkKey, size_t > chunk_byte_count_
std::map< std::string, std::string, std::less<> > options
bool is_array() const
Definition: sqltypes.h:424
bool is_string() const
Definition: sqltypes.h:416
void populateChunkMapForColumns(const std::set< const ColumnDescriptor *> &columns, const int fragment_id, const std::map< ChunkKey, AbstractBuffer *> &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
static void validateOptions(const ForeignTable *foreign_table)
static std::shared_ptr< Catalog > get(const std::string &dbName)
Definition: Catalog.cpp:3671
static constexpr std::array< char const *, 1 > supported_options
Definition: ForeignTable.h:27
virtual size_t size() const =0
virtual size_t read(void *buffer, size_t max_size)=0
ParseBufferResult parse_buffer(ParseBufferRequest &request)
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, std::mutex &file_access_mutex, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
void update_metadata(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const csv_file_buffer_parser::ParseBufferRequest &request, const csv_file_buffer_parser::ParseBufferResult &result, std::map< int, const ColumnDescriptor *> &column_by_id)
#define UNREACHABLE()
Definition: Logger.h:241
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
bool is_varlen() const
Definition: sqltypes.h:431
std::unique_ptr< CsvReader > csv_reader_
std::shared_ptr< Catalog_Namespace::Catalog > catalog
std::string to_string(char const *&&v)
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:108
int32_t StringOffsetT
Definition: sqltypes.h:867
static constexpr std::string_view STORAGE_TYPE_KEY
Definition: ForeignServer.h:43
std::set< const ColumnDescriptor * > get_columns(const std::map< ChunkKey, AbstractBuffer *> &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)
std::vector< FileRegion > FileRegions
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
void initialize_import_buffers(const std::list< const ColumnDescriptor *> &columns, std::shared_ptr< Catalog_Namespace::Catalog > catalog, std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
std::string validateAndGetStringWithLength(const std::string &option_name, const size_t expected_num_chars)
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
std::map< int, FileRegions > fragment_id_to_file_regions_map_
An AbstractBuffer is a unit of data management for a data manager.
std::queue< csv_file_buffer_parser::ParseBufferRequest > request_pool
void validate_allowed_file_path(const std::string &file_path, const DataTransferType data_transfer_type, const bool allow_wildcards)
Definition: DdlUtils.cpp:611
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:40
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, std::mutex &file_region_mutex, int fragment_id, size_t first_row_index, const csv_file_buffer_parser::ParseBufferResult &result)
bool is_geometry() const
Definition: sqltypes.h:428
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
CsvDataWrapper(const int db_id, const ForeignTable *foreign_table)
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::optional< bool > validateAndGetBoolValue(const std::string &option_name)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static constexpr std::string_view LOCAL_FILE_STORAGE_TYPE
Definition: ForeignServer.h:45
csv_file_buffer_parser::ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
int32_t ArrayOffsetT
Definition: sqltypes.h:868
void populateChunkBuffers(std::map< ChunkKey, AbstractBuffer *> &required_buffers, std::map< ChunkKey, AbstractBuffer *> &optional_buffers) override
static constexpr std::array< char const *, 14 > supported_options_
std::queue< csv_file_buffer_parser::ParseBufferRequest > pending_requests
virtual bool isScanFinished()=0
#define IS_STRING(T)
Definition: sqltypes.h:173
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:26
static constexpr std::string_view BASE_PATH_KEY
Definition: ForeignServer.h:44
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:39
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::optional< csv_file_buffer_parser::ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
std::vector< int > ChunkKey
Definition: types.h:35
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, std::mutex &file_region_mutex)
int8_t * numbersPtr
Definition: sqltypes.h:149
virtual void updateStats(const int64_t val, const bool is_null)=0
virtual void reserve(size_t num_bytes)=0
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
import_export::CopyParams validateAndGetCopyParams()
bool operator<(const ParseFileRegionResult &other) const
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset)