OmniSciDB  a575cb28ea
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CsvDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "CsvDataWrapper.h"
18 
19 #include <algorithm>
20 #include <condition_variable>
21 #include <mutex>
22 #include <queue>
23 
24 #include <rapidjson/document.h>
25 #include <boost/filesystem.hpp>
26 
27 #include "CsvShared.h"
31 #include "FsiJsonUtils.h"
33 #include "ImportExport/Importer.h"
34 #include "Shared/sqltypes.h"
35 #include "Utils/DdlUtils.h"
36 
37 namespace foreign_storage {
38 CsvDataWrapper::CsvDataWrapper() : db_id_(-1), foreign_table_(nullptr) {}
39 
40 CsvDataWrapper::CsvDataWrapper(const int db_id, const ForeignTable* foreign_table)
41  : db_id_(db_id), foreign_table_(foreign_table), is_restored_(false) {}
42 
43 void CsvDataWrapper::validateTableOptions(const ForeignTable* foreign_table) const {
45  Csv::validate_options(foreign_table);
46 }
47 const std::set<std::string_view>& CsvDataWrapper::getSupportedTableOptions() const {
48  static const auto supported_table_options = getAllCsvTableOptions();
49  return supported_table_options;
50 }
51 
52 std::set<std::string_view> CsvDataWrapper::getAllCsvTableOptions() const {
53  std::set<std::string_view> supported_table_options(
56  supported_table_options.insert(csv_table_options_.begin(), csv_table_options_.end());
57  return supported_table_options;
58 }
59 
60 namespace {
61 std::set<const ColumnDescriptor*> get_columns(
62  const std::map<ChunkKey, AbstractBuffer*>& buffers,
63  std::shared_ptr<Catalog_Namespace::Catalog> catalog,
64  const int32_t table_id,
65  const int fragment_id) {
66  CHECK(!buffers.empty());
67  std::set<const ColumnDescriptor*> columns;
68  for (const auto& entry : buffers) {
69  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
70  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
71  const auto column = catalog->getMetadataForColumnUnlocked(table_id, column_id);
72  columns.emplace(column);
73  }
74  return columns;
75 }
76 } // namespace
77 
78 namespace {
79 bool skip_metadata_scan(const ColumnDescriptor* column) {
80  return column->columnType.is_dict_encoded_type();
81 }
82 } // namespace
83 
85  const std::set<const ColumnDescriptor*>& columns,
86  const int fragment_id,
87  const std::map<ChunkKey, AbstractBuffer*>& buffers,
88  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
89  for (const auto column : columns) {
90  ChunkKey data_chunk_key;
91  AbstractBuffer* data_buffer = nullptr;
92  AbstractBuffer* index_buffer = nullptr;
93  if (column->columnType.is_varlen_indeed()) {
94  data_chunk_key = {
95  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 1};
96  ChunkKey index_chunk_key = {
97  db_id_, foreign_table_->tableId, column->columnId, fragment_id, 2};
98 
99  CHECK(buffers.find(data_chunk_key) != buffers.end());
100  CHECK(buffers.find(index_chunk_key) != buffers.end());
101 
102  data_buffer = buffers.find(data_chunk_key)->second;
103  index_buffer = buffers.find(index_chunk_key)->second;
104  CHECK_EQ(data_buffer->size(), static_cast<size_t>(0));
105  CHECK_EQ(index_buffer->size(), static_cast<size_t>(0));
106 
107  size_t index_offset_size{0};
108  if (column->columnType.is_string() || column->columnType.is_geometry()) {
109  index_offset_size = sizeof(StringOffsetT);
110  } else if (column->columnType.is_array()) {
111  index_offset_size = sizeof(ArrayOffsetT);
112  } else {
113  UNREACHABLE();
114  }
115  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
116  index_buffer->reserve(index_offset_size *
117  (chunk_metadata_map_[data_chunk_key]->numElements + 1));
118  } else {
119  data_chunk_key = {db_id_, foreign_table_->tableId, column->columnId, fragment_id};
120  CHECK(buffers.find(data_chunk_key) != buffers.end());
121  data_buffer = buffers.find(data_chunk_key)->second;
122  }
123  data_buffer->reserve(chunk_metadata_map_[data_chunk_key]->numBytes);
124  column_id_to_chunk_map[column->columnId] = Chunk_NS::Chunk{column};
125  column_id_to_chunk_map[column->columnId].setBuffer(data_buffer);
126  column_id_to_chunk_map[column->columnId].setIndexBuffer(index_buffer);
127  column_id_to_chunk_map[column->columnId].initEncoder();
128  }
129 }
130 
132  std::map<ChunkKey, AbstractBuffer*>& required_buffers,
133  std::map<ChunkKey, AbstractBuffer*>& optional_buffers) {
134  auto timer = DEBUG_TIMER(__func__);
136  CHECK(catalog);
137  CHECK(!required_buffers.empty());
138 
139  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
140  std::set<const ColumnDescriptor*> required_columns =
141  get_columns(required_buffers, catalog, foreign_table_->tableId, fragment_id);
142  std::map<int, Chunk_NS::Chunk> column_id_to_chunk_map;
144  required_columns, fragment_id, required_buffers, column_id_to_chunk_map);
145 
146  if (!optional_buffers.empty()) {
147  std::set<const ColumnDescriptor*> optional_columns;
148  optional_columns =
149  get_columns(optional_buffers, catalog, foreign_table_->tableId, fragment_id);
151  optional_columns, fragment_id, optional_buffers, column_id_to_chunk_map);
152  }
153  populateChunks(column_id_to_chunk_map, fragment_id);
154  updateMetadata(column_id_to_chunk_map, fragment_id);
155  for (auto& entry : column_id_to_chunk_map) {
156  entry.second.setBuffer(nullptr);
157  entry.second.setIndexBuffer(nullptr);
158  }
159 }
160 
161 // if column was skipped during scan, update metadata now
163  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
164  int fragment_id) {
165  auto fragmenter = foreign_table_->fragmenter;
166  if (fragmenter) {
168  CHECK(catalog);
169  for (auto& entry : column_id_to_chunk_map) {
170  const auto& column =
171  catalog->getMetadataForColumnUnlocked(foreign_table_->tableId, entry.first);
172  if (skip_metadata_scan(column)) {
173  ChunkKey data_chunk_key = {
174  db_id_, foreign_table_->tableId, column->columnId, fragment_id};
175  if (column->columnType.is_varlen_indeed()) {
176  data_chunk_key.emplace_back(1);
177  }
178  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
179  auto cached_metadata = chunk_metadata_map_[data_chunk_key];
180  auto chunk_metadata =
181  entry.second.getBuffer()->getEncoder()->getMetadata(column->columnType);
182  cached_metadata->chunkStats.max = chunk_metadata->chunkStats.max;
183  cached_metadata->chunkStats.min = chunk_metadata->chunkStats.min;
184  cached_metadata->numBytes = entry.second.getBuffer()->size();
185  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
186  }
187  }
188  }
189 }
190 
196  size_t file_offset;
197  size_t row_count;
198  std::map<int, DataBlockPtr> column_id_to_data_blocks_map;
199 
200  bool operator<(const ParseFileRegionResult& other) const {
201  return file_offset < other.file_offset;
202  }
203 };
204 
210  const FileRegions& file_regions,
211  const size_t start_index,
212  const size_t end_index,
213  CsvReader& csv_reader,
214  csv_file_buffer_parser::ParseBufferRequest& parse_file_request,
215  const std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map) {
216  ParseFileRegionResult load_file_region_result{};
217  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
218  load_file_region_result.row_count = 0;
219 
221  for (size_t i = start_index; i <= end_index; i++) {
222  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
223  size_t read_size;
224  {
225  read_size = csv_reader.readRegion(parse_file_request.buffer.get(),
226  file_regions[i].first_row_file_offset,
227  file_regions[i].region_size);
228  }
229 
230  CHECK_EQ(file_regions[i].region_size, read_size);
231  parse_file_request.begin_pos = 0;
232  parse_file_request.end_pos = file_regions[i].region_size;
233  parse_file_request.first_row_index = file_regions[i].first_row_index;
234  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
235  parse_file_request.process_row_count = file_regions[i].row_count;
236 
237  result = parse_buffer(parse_file_request, i == end_index);
238  CHECK_EQ(file_regions[i].row_count, result.row_count);
239  load_file_region_result.row_count += result.row_count;
240  }
241  load_file_region_result.column_id_to_data_blocks_map =
243  return load_file_region_result;
244 }
245 
249 size_t get_buffer_size(const import_export::CopyParams& copy_params,
250  const bool size_known,
251  const size_t file_size) {
252  size_t buffer_size = copy_params.buffer_size;
253  if (size_known && file_size < buffer_size) {
254  buffer_size = file_size + 1; // +1 for end of line character, if missing
255  }
256  return buffer_size;
257 }
258 
259 size_t get_buffer_size(const FileRegions& file_regions) {
260  size_t buffer_size = 0;
261  for (const auto& file_region : file_regions) {
262  buffer_size = std::max(buffer_size, file_region.region_size);
263  }
264  CHECK(buffer_size);
265  return buffer_size;
266 }
267 
272 size_t get_thread_count(const import_export::CopyParams& copy_params,
273  const bool size_known,
274  const size_t file_size,
275  const size_t buffer_size) {
276  size_t thread_count = copy_params.threads;
277  if (thread_count == 0) {
278  thread_count = std::thread::hardware_concurrency();
279  }
280  if (size_known) {
281  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
282  if (num_buffers_in_file < thread_count) {
283  thread_count = num_buffers_in_file;
284  }
285  }
286  CHECK(thread_count);
287  return thread_count;
288 }
289 
290 size_t get_thread_count(const import_export::CopyParams& copy_params,
291  const FileRegions& file_regions) {
292  size_t thread_count = copy_params.threads;
293  if (thread_count == 0) {
294  thread_count =
295  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
296  }
297  CHECK(thread_count);
298  return thread_count;
299 }
300 
302  std::map<int, Chunk_NS::Chunk>& column_id_to_chunk_map,
303  int fragment_id) {
304  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
305 
306  CHECK(!column_id_to_chunk_map.empty());
307  const auto& file_regions = fragment_id_to_file_regions_map_[fragment_id];
308  CHECK(!file_regions.empty());
309 
310  const auto buffer_size = get_buffer_size(file_regions);
311  const auto thread_count = get_thread_count(copy_params, file_regions);
312 
313  const int batch_size = (file_regions.size() + thread_count - 1) / thread_count;
314 
315  std::vector<csv_file_buffer_parser::ParseBufferRequest> parse_file_requests{};
316  parse_file_requests.reserve(thread_count);
317  std::vector<std::future<ParseFileRegionResult>> futures{};
318  std::set<int> column_filter_set;
319  for (const auto& pair : column_id_to_chunk_map) {
320  column_filter_set.insert(pair.first);
321  }
322 
323  std::vector<std::unique_ptr<CsvReader>> csv_readers;
324  rapidjson::Value reader_metadata(rapidjson::kObjectType);
325  rapidjson::Document d;
326  auto& server_options = foreign_table_->foreign_server->options;
327  csv_reader_->serialize(reader_metadata, d.GetAllocator());
328  const auto csv_file_path = getFullFilePath(foreign_table_);
329 
330  for (size_t i = 0; i < file_regions.size(); i += batch_size) {
331  parse_file_requests.emplace_back(buffer_size,
332  copy_params,
333  db_id_,
335  column_filter_set,
336  csv_file_path);
337  auto start_index = i;
338  auto end_index =
339  std::min<size_t>(start_index + batch_size - 1, file_regions.size() - 1);
340 
341  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
342  csv_readers.emplace_back(std::make_unique<LocalMultiFileReader>(
343  csv_file_path, copy_params, reader_metadata));
344  } else {
345  UNREACHABLE();
346  }
347 
348  futures.emplace_back(std::async(std::launch::async,
350  std::ref(file_regions),
351  start_index,
352  end_index,
353  std::ref(*(csv_readers.back())),
354  std::ref(parse_file_requests.back()),
355  std::ref(column_id_to_chunk_map)));
356  }
357 
358  std::vector<ParseFileRegionResult> load_file_region_results{};
359  for (auto& future : futures) {
360  future.wait();
361  load_file_region_results.emplace_back(future.get());
362  }
363 
364  for (auto result : load_file_region_results) {
365  for (auto& [column_id, chunk] : column_id_to_chunk_map) {
366  chunk.appendData(
367  result.column_id_to_data_blocks_map[column_id], result.row_count, 0);
368  }
369  }
370 }
371 
378 std::vector<size_t> partition_by_fragment(const size_t start_row_index,
379  const size_t max_fragment_size,
380  const size_t buffer_row_count) {
381  CHECK(buffer_row_count > 0);
382  std::vector<size_t> partitions{};
383  size_t remaining_rows_in_last_fragment;
384  if (start_row_index % max_fragment_size == 0) {
385  remaining_rows_in_last_fragment = 0;
386  } else {
387  remaining_rows_in_last_fragment =
388  max_fragment_size - (start_row_index % max_fragment_size);
389  }
390  if (buffer_row_count <= remaining_rows_in_last_fragment) {
391  partitions.emplace_back(buffer_row_count);
392  } else {
393  if (remaining_rows_in_last_fragment > 0) {
394  partitions.emplace_back(remaining_rows_in_last_fragment);
395  }
396  size_t remaining_buffer_row_count =
397  buffer_row_count - remaining_rows_in_last_fragment;
398  while (remaining_buffer_row_count > 0) {
399  partitions.emplace_back(
400  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
401  remaining_buffer_row_count -= partitions.back();
402  }
403  }
404  return partitions;
405 }
406 
413  std::queue<csv_file_buffer_parser::ParseBufferRequest> pending_requests;
415  std::condition_variable pending_requests_condition;
416  std::queue<csv_file_buffer_parser::ParseBufferRequest> request_pool;
417  std::mutex request_pool_mutex;
418  std::condition_variable request_pool_condition;
420  std::map<ChunkKey, std::unique_ptr<ForeignStorageBuffer>> chunk_encoder_buffers;
421  std::map<ChunkKey, Chunk_NS::Chunk> cached_chunks;
423  std::map<ChunkKey, size_t> chunk_byte_count;
425 };
426 
431 std::optional<csv_file_buffer_parser::ParseBufferRequest> get_next_metadata_scan_request(
432  MetadataScanMultiThreadingParams& multi_threading_params) {
433  std::unique_lock<std::mutex> pending_requests_lock(
434  multi_threading_params.pending_requests_mutex);
435  multi_threading_params.pending_requests_condition.wait(
436  pending_requests_lock, [&multi_threading_params] {
437  return !multi_threading_params.pending_requests.empty() ||
438  !multi_threading_params.continue_processing;
439  });
440  if (multi_threading_params.pending_requests.empty()) {
441  return {};
442  }
443  auto request = std::move(multi_threading_params.pending_requests.front());
444  multi_threading_params.pending_requests.pop();
445  pending_requests_lock.unlock();
446  multi_threading_params.pending_requests_condition.notify_all();
447  return std::move(request);
448 }
449 
454 void add_file_region(std::map<int, FileRegions>& fragment_id_to_file_regions_map,
455  int fragment_id,
456  size_t first_row_index,
458  const std::string& file_path) {
459  fragment_id_to_file_regions_map[fragment_id].emplace_back(
460  FileRegion(file_path,
461  result.row_offsets.front(),
462  first_row_index,
463  result.row_count,
464  result.row_offsets.back() - result.row_offsets.front()));
465 }
466 
472  SQLTypeInfo sql_type_info) {
473  CHECK(sql_type_info.is_varlen());
474  size_t byte_count = 0;
475  if (sql_type_info.is_string() || sql_type_info.is_geometry()) {
476  for (const auto& str : *data_block.stringsPtr) {
477  byte_count += str.length();
478  }
479  } else if (sql_type_info.is_array()) {
480  for (const auto& array : *data_block.arraysPtr) {
481  byte_count += array.length;
482  }
483  } else {
484  UNREACHABLE();
485  }
486  return byte_count;
487 }
488 
493 void update_stats(Encoder* encoder,
494  const SQLTypeInfo& column_type,
495  DataBlockPtr data_block,
496  const size_t row_count) {
497  if (column_type.is_array()) {
498  encoder->updateStats(data_block.arraysPtr, 0, row_count);
499  } else if (!column_type.is_varlen()) {
500  encoder->updateStats(data_block.numbersPtr, row_count);
501  } else {
502  encoder->updateStats(data_block.stringsPtr, 0, row_count);
503  }
504 }
505 namespace {
507  std::shared_ptr<Catalog_Namespace::Catalog>& catalog) {
508  if (catalog->getDataMgr()
509  .getPersistentStorageMgr()
510  ->getDiskCacheConfig()
511  .isEnabledForFSI()) {
512  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
513  } else {
514  return nullptr;
515  }
516 }
517 } // namespace
518 
519 // If cache is enabled, populate cached_chunks buffers with data blocks
520 void cache_blocks(std::map<ChunkKey, Chunk_NS::Chunk>& cached_chunks,
521  DataBlockPtr data_block,
522  size_t row_count,
523  ChunkKey& chunk_key,
524  const ColumnDescriptor* column,
525  bool is_first_block,
526  bool is_last_block) {
527  auto catalog =
529  CHECK(catalog);
530  auto cache = get_cache_if_enabled(catalog);
531  if (cache) {
532  ChunkKey index_key = {chunk_key[CHUNK_KEY_DB_IDX],
533  chunk_key[CHUNK_KEY_TABLE_IDX],
534  chunk_key[CHUNK_KEY_COLUMN_IDX],
535  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
536  2};
537  // Create actual data chunks to prepopulate cache
538  if (cached_chunks.find(chunk_key) == cached_chunks.end()) {
539  cached_chunks[chunk_key] = Chunk_NS::Chunk{column};
540  cached_chunks[chunk_key].setBuffer(
541  cache->getChunkBufferForPrecaching(chunk_key, is_first_block));
542  if (column->columnType.is_varlen_indeed()) {
543  cached_chunks[chunk_key].setIndexBuffer(
544  cache->getChunkBufferForPrecaching(index_key, is_first_block));
545  }
546  if (is_first_block) {
547  cached_chunks[chunk_key].initEncoder();
548  }
549  }
550  cached_chunks[chunk_key].appendData(data_block, row_count, 0);
551 
552  if (is_last_block) {
553  // cache the chunks now so they are tracked by eviction algorithm
554  std::vector<ChunkKey> key_to_cache{chunk_key};
555  if (column->columnType.is_varlen_indeed()) {
556  key_to_cache.push_back(index_key);
557  }
558  cache->cacheTableChunks(key_to_cache);
559  }
560  }
561 }
562 
569  int fragment_id,
572  std::map<int, const ColumnDescriptor*>& column_by_id,
573  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
574  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_encoder_buffers_mutex);
575  // File regions should be added in same order as appendData
576  add_file_region(fragment_id_to_file_regions_map,
577  fragment_id,
578  request.first_row_index,
579  result,
580  request.getFilePath());
581 
582  for (auto& [column_id, data_block] : result.column_id_to_data_blocks_map) {
583  ChunkKey chunk_key{request.db_id, request.getTableId(), column_id, fragment_id};
584  const auto column = column_by_id[column_id];
585  size_t byte_count;
586  if (column->columnType.is_varlen_indeed()) {
587  chunk_key.emplace_back(1);
588  byte_count = get_var_length_data_block_size(data_block, column->columnType);
589  } else {
590  byte_count = column->columnType.get_size() * result.row_count;
591  }
592 
593  {
594  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_byte_count_mutex);
595  multi_threading_params.chunk_byte_count[chunk_key] += byte_count;
596  }
597 
598  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
599  multi_threading_params.chunk_encoder_buffers.end()) {
600  multi_threading_params.chunk_encoder_buffers[chunk_key] =
601  std::make_unique<ForeignStorageBuffer>();
602  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
603  column->columnType);
604  }
605  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder(),
606  column->columnType,
607  data_block,
608  result.row_count);
609  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
610  ->getEncoder()
611  ->getNumElems() +
612  result.row_count;
613  multi_threading_params.chunk_encoder_buffers[chunk_key]->getEncoder()->setNumElems(
614  num_elements);
615  cache_blocks(
616  multi_threading_params.cached_chunks,
617  data_block,
618  result.row_count,
619  chunk_key,
620  column,
621  (num_elements - result.row_count) == 0, // Is the first block added to this chunk
622  num_elements == request.getMaxFragRows() // Is the last block for this chunk
623  );
624  }
625 }
626 
633  std::unique_lock<std::mutex> completed_requests_queue_lock(
634  multi_threading_params.request_pool_mutex);
635  multi_threading_params.request_pool.emplace(std::move(request));
636  completed_requests_queue_lock.unlock();
637  multi_threading_params.request_pool_condition.notify_all();
638 }
639 
644 void scan_metadata(MetadataScanMultiThreadingParams& multi_threading_params,
645  std::map<int, FileRegions>& fragment_id_to_file_regions_map) {
646  std::map<int, const ColumnDescriptor*> column_by_id{};
647  while (true) {
648  auto request_opt = get_next_metadata_scan_request(multi_threading_params);
649  if (!request_opt.has_value()) {
650  break;
651  }
652  auto& request = request_opt.value();
653  try {
654  if (column_by_id.empty()) {
655  for (const auto column : request.getColumns()) {
656  column_by_id[column->columnId] = column;
657  }
658  }
659  auto partitions = partition_by_fragment(
660  request.first_row_index, request.getMaxFragRows(), request.buffer_row_count);
661  request.begin_pos = 0;
662  size_t row_index = request.first_row_index;
663  for (const auto partition : partitions) {
664  request.process_row_count = partition;
665  for (const auto& import_buffer : request.import_buffers) {
666  if (import_buffer != nullptr) {
667  import_buffer->clear();
668  }
669  }
670  auto result = parse_buffer(request, true);
671  int fragment_id = row_index / request.getMaxFragRows();
672  process_data_blocks(multi_threading_params,
673  fragment_id,
674  request,
675  result,
676  column_by_id,
677  fragment_id_to_file_regions_map);
678  row_index += result.row_count;
679  request.begin_pos = result.row_offsets.back() - request.file_offset;
680  }
681  } catch (...) {
682  // Re-add request to pool so we dont block any other threads
683  {
684  std::lock_guard<std::mutex> pending_requests_lock(
685  multi_threading_params.pending_requests_mutex);
686  multi_threading_params.continue_processing = false;
687  }
688  add_request_to_pool(multi_threading_params, request);
689  throw;
690  }
691  add_request_to_pool(multi_threading_params, request);
692  }
693 }
694 
699  MetadataScanMultiThreadingParams& multi_threading_params) {
700  std::unique_lock<std::mutex> request_pool_lock(
701  multi_threading_params.request_pool_mutex);
702  multi_threading_params.request_pool_condition.wait(
703  request_pool_lock,
704  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
705  auto request = std::move(multi_threading_params.request_pool.front());
706  multi_threading_params.request_pool.pop();
707  request_pool_lock.unlock();
708  CHECK(request.buffer);
709  return request;
710 }
711 
717  MetadataScanMultiThreadingParams& multi_threading_params,
719  {
720  std::unique_lock<std::mutex> pending_requests_lock(
721  multi_threading_params.pending_requests_mutex);
722  multi_threading_params.pending_requests.emplace(std::move(request));
723  }
724  multi_threading_params.pending_requests_condition.notify_all();
725 }
726 
731 void resize_buffer_if_needed(std::unique_ptr<char[]>& buffer,
732  size_t& buffer_size,
733  const size_t alloc_size) {
734  CHECK_LE(buffer_size, alloc_size);
735  if (buffer_size < alloc_size) {
736  buffer = std::make_unique<char[]>(alloc_size);
737  buffer_size = alloc_size;
738  }
739 }
740 
746  const size_t& buffer_size,
747  const std::string& file_path,
748  CsvReader& csv_reader,
749  const import_export::CopyParams& copy_params,
750  MetadataScanMultiThreadingParams& multi_threading_params,
751  size_t& first_row_index_in_buffer,
752  size_t& current_file_offset) {
753  auto alloc_size = buffer_size;
754  auto residual_buffer = std::make_unique<char[]>(alloc_size);
755  size_t residual_buffer_size = 0;
756  size_t residual_buffer_alloc_size = alloc_size;
757 
758  while (!csv_reader.isScanFinished()) {
759  {
760  std::lock_guard<std::mutex> pending_requests_lock(
761  multi_threading_params.pending_requests_mutex);
762  if (!multi_threading_params.continue_processing) {
763  break;
764  }
765  }
766  auto request = get_request_from_pool(multi_threading_params);
767  resize_buffer_if_needed(request.buffer, request.buffer_alloc_size, alloc_size);
768 
769  if (residual_buffer_size > 0) {
770  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
771  }
772  size_t size = residual_buffer_size;
773  size += csv_reader.read(request.buffer.get() + residual_buffer_size,
774  alloc_size - residual_buffer_size);
775 
776  if (size == 0) {
777  // In some cases at the end of a file we will read 0 bytes even when
778  // csv_reader.isScanFinished() is false
779  continue;
780  } else if (size == 1 && request.buffer[0] == copy_params.line_delim) {
781  // In some cases files with newlines at the end will be encoded with a second
782  // newline that can end up being the only thing in the buffer
783  current_file_offset++;
784  continue;
785  }
786  unsigned int num_rows_in_buffer = 0;
787  request.end_pos =
789  request.buffer,
790  size,
791  copy_params,
792  first_row_index_in_buffer,
793  num_rows_in_buffer,
794  nullptr,
795  &csv_reader);
796  request.buffer_size = size;
797  request.buffer_alloc_size = alloc_size;
798  request.first_row_index = first_row_index_in_buffer;
799  request.file_offset = current_file_offset;
800  request.buffer_row_count = num_rows_in_buffer;
801 
802  residual_buffer_size = size - request.end_pos;
803  if (residual_buffer_size > 0) {
804  resize_buffer_if_needed(residual_buffer, residual_buffer_alloc_size, alloc_size);
805  memcpy(residual_buffer.get(),
806  request.buffer.get() + request.end_pos,
807  residual_buffer_size);
808  }
809 
810  current_file_offset += request.end_pos;
811  first_row_index_in_buffer += num_rows_in_buffer;
812 
813  dispatch_metadata_scan_request(multi_threading_params, request);
814  }
815 
816  std::unique_lock<std::mutex> pending_requests_queue_lock(
817  multi_threading_params.pending_requests_mutex);
818  multi_threading_params.pending_requests_condition.wait(
819  pending_requests_queue_lock, [&multi_threading_params] {
820  return multi_threading_params.pending_requests.empty() ||
821  (multi_threading_params.continue_processing == false);
822  });
823  multi_threading_params.continue_processing = false;
824  pending_requests_queue_lock.unlock();
825  multi_threading_params.pending_requests_condition.notify_all();
826 }
827 
828 namespace {
829 // Create metadata for unscanned columns
830 // Any fragments with any updated rows between start_row and num_rows will be updated
831 // Chunks prior to start_row will be restored from (ie for append
832 // workflows)
834  const ColumnDescriptor* column,
835  const ForeignTable* foreign_table,
836  const int db_id,
837  const size_t start_row,
838  const size_t total_num_rows,
839  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map) {
840  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
841  if (column->columnType.is_varlen_indeed()) {
842  chunk_key.emplace_back(1);
843  }
844 
845  // Create placeholder metadata for every fragment touched by this scan
846  int start_fragment = start_row / foreign_table->maxFragRows;
847  int end_fragment = total_num_rows / foreign_table->maxFragRows;
848  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
849  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
850  (fragment_id + 1)) > total_num_rows)
851  ? total_num_rows % foreign_table->maxFragRows
852  : foreign_table->maxFragRows;
853 
854  ForeignStorageBuffer empty_buffer;
855  // Use default encoder metadata as in parquet wrapper
856  empty_buffer.initEncoder(column->columnType);
857  auto chunk_metadata = empty_buffer.getEncoder()->getMetadata(column->columnType);
858  chunk_metadata->numElements = num_elements;
859  // signal to query engine populate, not set by default for arrays
860  chunk_metadata->chunkStats.min.intval = std::numeric_limits<int32_t>::max();
861  chunk_metadata->chunkStats.max.intval = std::numeric_limits<int32_t>::lowest();
862 
863  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
864  chunk_metadata_map[chunk_key] = chunk_metadata;
865  }
866 }
867 
868 } // namespace
869 
883  auto timer = DEBUG_TIMER(__func__);
884 
885  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
886  const auto file_path = getFullFilePath(foreign_table_);
888  CHECK(catalog);
889  auto& server_options = foreign_table_->foreign_server->options;
890  if (foreign_table_->isAppendMode() && csv_reader_ != nullptr) {
891  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
892  csv_reader_->checkForMoreRows(append_start_offset_);
893  } else {
894  UNREACHABLE();
895  }
896  } else {
897  chunk_metadata_map_.clear();
899  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
900  csv_reader_ = std::make_unique<LocalMultiFileReader>(file_path, copy_params);
901  } else {
902  UNREACHABLE();
903  }
904  num_rows_ = 0;
906  }
907 
908  auto columns = catalog->getAllColumnMetadataForTableUnlocked(
909  foreign_table_->tableId, false, false, true);
910  std::map<int32_t, const ColumnDescriptor*> column_by_id{};
911  for (auto column : columns) {
912  column_by_id[column->columnId] = column;
913  }
914  MetadataScanMultiThreadingParams multi_threading_params;
915 
916  // Restore previous chunk data
917  if (foreign_table_->isAppendMode()) {
918  multi_threading_params.chunk_byte_count = chunk_byte_count_;
919  multi_threading_params.chunk_encoder_buffers = std::move(chunk_encoder_buffers_);
920  }
921 
922  std::set<int> columns_to_scan;
923  for (auto column : columns) {
924  if (!skip_metadata_scan(column)) {
925  columns_to_scan.insert(column->columnId);
926  }
927  }
928  // Track where scan started for appends
929  int start_row = num_rows_;
930  if (!csv_reader_->isScanFinished()) {
931  auto buffer_size = get_buffer_size(copy_params,
932  csv_reader_->isRemainingSizeKnown(),
933  csv_reader_->getRemainingSize());
934  auto thread_count = get_thread_count(copy_params,
935  csv_reader_->isRemainingSizeKnown(),
936  csv_reader_->getRemainingSize(),
937  buffer_size);
938  multi_threading_params.continue_processing = true;
939 
940  std::vector<std::future<void>> futures{};
941  for (size_t i = 0; i < thread_count; i++) {
942  multi_threading_params.request_pool.emplace(buffer_size,
943  copy_params,
944  db_id_,
946  columns_to_scan,
948 
949  futures.emplace_back(std::async(std::launch::async,
951  std::ref(multi_threading_params),
953  }
954 
955  try {
957  file_path,
958  (*csv_reader_),
959  copy_params,
960  multi_threading_params,
961  num_rows_,
963  } catch (...) {
964  {
965  std::unique_lock<std::mutex> pending_requests_lock(
966  multi_threading_params.pending_requests_mutex);
967  multi_threading_params.continue_processing = false;
968  }
969  multi_threading_params.pending_requests_condition.notify_all();
970  throw;
971  }
972 
973  for (auto& future : futures) {
974  // get() instead of wait() because we need to propagate potential exceptions.
975  future.get();
976  }
977  }
978 
979  for (auto& [chunk_key, buffer] : multi_threading_params.chunk_encoder_buffers) {
980  auto chunk_metadata =
981  buffer->getEncoder()->getMetadata(column_by_id[chunk_key[2]]->columnType);
982  chunk_metadata->numElements = buffer->getEncoder()->getNumElems();
983  chunk_metadata->numBytes = multi_threading_params.chunk_byte_count[chunk_key];
984  chunk_metadata_map_[chunk_key] = chunk_metadata;
985  }
986 
987  for (auto column : columns) {
988  if (skip_metadata_scan(column)) {
990  column, foreign_table_, db_id_, start_row, num_rows_, chunk_metadata_map_);
991  }
992  }
993 
994  for (auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
995  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
996  }
997 
998  // Save chunk data
999  if (foreign_table_->isAppendMode()) {
1000  chunk_byte_count_ = multi_threading_params.chunk_byte_count;
1001  chunk_encoder_buffers_ = std::move(multi_threading_params.chunk_encoder_buffers);
1002  }
1003 
1004  // Any incomplete chunks should be cached now
1005  auto cache = get_cache_if_enabled(catalog);
1006  if (cache) {
1007  std::vector<ChunkKey> to_cache;
1008  for (auto& [chunk_key, buffer] : multi_threading_params.cached_chunks) {
1009  if (buffer.getBuffer()->getEncoder()->getNumElems() !=
1010  static_cast<size_t>(foreign_table_->maxFragRows)) {
1011  if (column_by_id[chunk_key[CHUNK_KEY_COLUMN_IDX]]
1012  ->columnType.is_varlen_indeed()) {
1013  ChunkKey index_chunk_key = chunk_key;
1014  index_chunk_key[4] = 2;
1015  to_cache.push_back(chunk_key);
1016  to_cache.push_back(index_chunk_key);
1017  } else {
1018  to_cache.push_back(chunk_key);
1019  }
1020  }
1021  }
1022  if (to_cache.size() > 0) {
1023  cache->cacheTableChunks(to_cache);
1024  }
1025  }
1026 }
1027 
1028 // Serialization functions for FileRegion
1029 void set_value(rapidjson::Value& json_val,
1030  const FileRegion& file_region,
1031  rapidjson::Document::AllocatorType& allocator) {
1032  json_val.SetObject();
1034  json_val, file_region.first_row_file_offset, "first_row_file_offset", allocator);
1036  json_val, file_region.first_row_index, "first_row_index", allocator);
1038  json_val, file_region.region_size, "region_size", allocator);
1040  json_val, file_region.row_count, "row_count", allocator);
1041 }
1042 
1043 void get_value(const rapidjson::Value& json_val, FileRegion& file_region) {
1044  CHECK(json_val.IsObject());
1046  json_val, file_region.first_row_file_offset, "first_row_file_offset");
1048  json_val, file_region.first_row_index, "first_row_index");
1049  json_utils::get_value_from_object(json_val, file_region.region_size, "region_size");
1050  json_utils::get_value_from_object(json_val, file_region.row_count, "row_count");
1051 }
1052 
1053 void CsvDataWrapper::serializeDataWrapperInternals(const std::string& file_path) const {
1054  rapidjson::Document d;
1055  d.SetObject();
1056 
1057  // Save fragment map
1060  "fragment_id_to_file_regions_map",
1061  d.GetAllocator());
1062 
1063  // Save csv_reader metadata
1064  rapidjson::Value reader_metadata(rapidjson::kObjectType);
1065  csv_reader_->serialize(reader_metadata, d.GetAllocator());
1066  d.AddMember("reader_metadata", reader_metadata, d.GetAllocator());
1067 
1068  json_utils::add_value_to_object(d, num_rows_, "num_rows", d.GetAllocator());
1070  d, append_start_offset_, "append_start_offset", d.GetAllocator());
1071 
1072  json_utils::write_to_file(d, file_path);
1073 }
1074 
1076  const std::string& file_path,
1077  const ChunkMetadataVector& chunk_metadata) {
1078  auto d = json_utils::read_from_file(file_path);
1079  CHECK(d.IsObject());
1080 
1081  // Restore fragment map
1083  d, fragment_id_to_file_regions_map_, "fragment_id_to_file_regions_map");
1084 
1085  // Construct csv_reader with metadta
1086  CHECK(d.HasMember("reader_metadata"));
1087  const auto copy_params = Csv::validate_and_get_copy_params(foreign_table_);
1088  const auto csv_file_path = getFullFilePath(foreign_table_);
1089  auto& server_options = foreign_table_->foreign_server->options;
1090  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
1091  csv_reader_ = std::make_unique<LocalMultiFileReader>(
1092  csv_file_path, copy_params, d["reader_metadata"]);
1093  } else {
1094  UNREACHABLE();
1095  }
1096 
1098  json_utils::get_value_from_object(d, append_start_offset_, "append_start_offset");
1099 
1100  // Now restore the internal metadata maps
1101  CHECK(chunk_metadata_map_.empty());
1102  CHECK(chunk_encoder_buffers_.empty());
1103 
1104  for (auto& pair : chunk_metadata) {
1105  chunk_metadata_map_[pair.first] = pair.second;
1106 
1107  if (foreign_table_->isAppendMode()) {
1108  // Restore encoder state for append mode
1109  chunk_encoder_buffers_[pair.first] = std::make_unique<ForeignStorageBuffer>();
1110  chunk_encoder_buffers_[pair.first]->initEncoder(pair.second->sqlType);
1111  chunk_encoder_buffers_[pair.first]->setSize(pair.second->numBytes);
1112  chunk_encoder_buffers_[pair.first]->getEncoder()->setNumElems(
1113  pair.second->numElements);
1114  chunk_encoder_buffers_[pair.first]->getEncoder()->resetChunkStats(
1115  pair.second->chunkStats);
1116  chunk_encoder_buffers_[pair.first]->setUpdated();
1117  chunk_byte_count_[pair.first] = pair.second->numBytes;
1118  }
1119  }
1120  is_restored_ = true;
1121 }
1122 
1124  return is_restored_;
1125 }
1126 
1127 const std::set<std::string_view> CsvDataWrapper::csv_table_options_{"ARRAY_DELIMITER",
1128  "ARRAY_MARKER",
1129  "BUFFER_SIZE",
1130  "DELIMITER",
1131  "ESCAPE",
1132  "HEADER",
1133  "LINE_DELIMITER",
1134  "LONLAT",
1135  "NULLS",
1136  "QUOTE",
1137  "QUOTED",
1138  "S3_ACCESS_TYPE"};
1139 } // namespace foreign_storage
std::map< int, DataBlockPtr > column_id_to_data_blocks_map
bool isRestored() const override
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
size_t get_var_length_data_block_size(DataBlockPtr data_block, SQLTypeInfo sql_type_info)
void populateChunkBuffers(std::map< ChunkKey, AbstractBuffer * > &required_buffers, std::map< ChunkKey, AbstractBuffer * > &optional_buffers) override
void serializeDataWrapperInternals(const std::string &file_path) const override
std::map< ChunkKey, size_t > chunk_byte_count_
static const std::set< std::string_view > csv_table_options_
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers_
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:218
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:92
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:219
void cache_blocks(std::map< ChunkKey, Chunk_NS::Chunk > &cached_chunks, DataBlockPtr data_block, size_t row_count, ChunkKey &chunk_key, const ColumnDescriptor *column, bool is_first_block, bool is_last_block)
ParseBufferResult parse_buffer(ParseBufferRequest &request, bool convert_data_blocks)
foreign_storage::ForeignStorageCache * get_cache_if_enabled(std::shared_ptr< Catalog_Namespace::Catalog > &catalog)
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
bool is_varlen() const
Definition: sqltypes.h:500
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
virtual size_t read(void *buffer, size_t max_size)=0
#define UNREACHABLE()
Definition: Logger.h:241
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
Constants for Builtin SQL Types supported by OmniSci.
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_encoder_buffers
std::unique_ptr< CsvReader > csv_reader_
std::set< std::string_view > getAllCsvTableOptions() const
void initEncoder(const SQLTypeInfo &tmp_sql_type)
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:108
void add_placeholder_metadata(const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
void validateTableOptions(const ForeignTable *foreign_table) const override
void validate_options(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:87
int32_t StringOffsetT
Definition: sqltypes.h:919
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, int fragment_id, size_t first_row_index, const csv_file_buffer_parser::ParseBufferResult &result, const std::string &file_path)
std::vector< FileRegion > FileRegions
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
static SysCatalog & instance()
Definition: SysCatalog.h:286
std::map< ChunkKey, Chunk_NS::Chunk > cached_chunks
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
rapidjson::Document read_from_file(const std::string &file_path)
virtual size_t readRegion(void *buffer, size_t offset, size_t size)=0
std::map< int, FileRegions > fragment_id_to_file_regions_map_
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool is_dict_encoded_type() const
Definition: sqltypes.h:516
An AbstractBuffer is a unit of data management for a data manager.
bool operator<(const ParseFileRegionResult &other) const
void updateMetadata(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
std::queue< csv_file_buffer_parser::ParseBufferRequest > request_pool
specifies the content in-memory of a row in the column metadata table
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool isAppendMode() const
Checks if the table is in append mode.
const std::set< std::string_view > & getSupportedTableOptions() const override
void populateChunkMapForColumns(const std::set< const ColumnDescriptor * > &columns, const int fragment_id, const std::map< ChunkKey, AbstractBuffer * > &buffers, std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void validateTableOptions(const ForeignTable *foreign_table) const override
size_t get_buffer_size(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
csv_file_buffer_parser::ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
size_t get_thread_count(const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
int32_t ArrayOffsetT
Definition: sqltypes.h:920
std::queue< csv_file_buffer_parser::ParseBufferRequest > pending_requests
void scan_metadata(MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
virtual bool isScanFinished()=0
ParseFileRegionResult parse_file_regions(const FileRegions &file_regions, const size_t start_index, const size_t end_index, CsvReader &csv_reader, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map)
void process_data_blocks(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const csv_file_buffer_parser::ParseBufferRequest &request, csv_file_buffer_parser::ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id, std::map< int, FileRegions > &fragment_id_to_file_regions_map)
const std::set< std::string_view > & getSupportedTableOptions() const override
bool skip_metadata_scan(const ColumnDescriptor *column)
const ForeignTable * foreign_table_
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:490
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::optional< csv_file_buffer_parser::ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
void populateChunks(std::map< int, Chunk_NS::Chunk > &column_id_to_chunk_map, int fragment_id)
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:506
bool is_string() const
Definition: sqltypes.h:478
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
int8_t * numbersPtr
Definition: sqltypes.h:217
std::set< const ColumnDescriptor * > get_columns(const std::map< ChunkKey, AbstractBuffer * > &buffers, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const int32_t table_id, const int fragment_id)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
virtual void updateStats(const int64_t val, const bool is_null)=0
virtual void reserve(size_t num_bytes)=0
void resize_buffer_if_needed(std::unique_ptr< char[]> &buffer, size_t &buffer_size, const size_t alloc_size)
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
size_t find_row_end_pos(size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FILE *file, foreign_storage::CsvReader *csv_reader)
Finds the closest possible row ending to the end of the given buffer. The buffer is resized as needed...
bool is_array() const
Definition: sqltypes.h:486
void dispatch_metadata_scan_requests(const size_t &buffer_size, const std::string &file_path, CsvReader &csv_reader, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params, size_t &first_row_index_in_buffer, size_t &current_file_offset)
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31