OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage Namespace Reference

Namespaces

 anonymous_namespace{LazyParquetImporter.cpp}
 
 anonymous_namespace{ParquetDataWrapper.cpp}
 
 csv_file_buffer_parser
 

Classes

struct  DataWrapperType
 Encapsulates an enumeration of foreign data wrapper type strings. More...
 
struct  ForeignServer
 
struct  ForeignTable
 
struct  OptionsContainer
 
struct  ParseFileRegionResult
 
struct  MetadataScanMultiThreadingParams
 
struct  FileRegion
 
class  CsvDataWrapper
 
class  ForeignDataWrapper
 
class  ForeignStorageBuffer
 
class  ForeignStorageCache
 
class  ForeignStorageMgr
 
class  ForeignTableColumnMap
 
struct  RowGroupType
 
struct  ColumnType
 
struct  FragmentType
 
struct  Interval
 
struct  RowGroupMetadata
 
class  LazyParquetImporter
 
class  ParquetDataWrapper
 
struct  AllowedParquetMetadataTypeMappings
 

Typedefs

using FileRegions = std::vector< FileRegion >
 
using read_lock = mapd_shared_lock< mapd_shared_mutex >
 
using write_lock = mapd_unique_lock< mapd_shared_mutex >
 

Functions

void add_end_of_line_if_needed (size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)
 
ParseFileRegionResult parse_file_regions (const FileRegions &file_regions, const size_t start_index, const size_t end_index, FILE *file, std::mutex &file_access_mutex, csv_file_buffer_parser::ParseBufferRequest &parse_file_request, const ChunkKey &chunk_key)
 
std::FILE * open_file (const std::string &file_path)
 
size_t get_buffer_size (const import_export::CopyParams &copy_params, const size_t file_size)
 
size_t get_thread_count (const import_export::CopyParams &copy_params, const size_t file_size, const size_t buffer_size)
 
void initialize_import_buffers (const std::list< const ColumnDescriptor * > &columns, std::shared_ptr< Catalog_Namespace::Catalog > catalog, std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
std::vector< size_t > partition_by_fragment (const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
 
std::optional
< csv_file_buffer_parser::ParseBufferRequest
get_next_metadata_scan_request (MetadataScanMultiThreadingParams &multi_threading_params)
 
void add_file_region (std::map< int, FileRegions > &fragment_id_to_file_regions_map, std::mutex &file_region_mutex, int fragment_id, size_t first_row_index, const csv_file_buffer_parser::ParseBufferResult &result)
 
size_t get_var_length_data_block_size (DataBlockPtr data_block, SQLTypeInfo sql_type_info)
 
void update_stats (Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
 
void update_metadata (MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const csv_file_buffer_parser::ParseBufferRequest &request, const csv_file_buffer_parser::ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id)
 
void add_request_to_pool (MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
 
void scan_metadata (MetadataScanMultiThreadingParams &multi_threading_params, std::map< int, FileRegions > &fragment_id_to_file_regions_map, std::mutex &file_region_mutex)
 
size_t get_offset_after_header (const std::string &file_path, const import_export::CopyParams &copy_params)
 
csv_file_buffer_parser::ParseBufferRequest get_request_from_pool (MetadataScanMultiThreadingParams &multi_threading_params)
 
void dispatch_metadata_scan_request (MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
 
void dispatch_metadata_scan_requests (const size_t buffer_size, const std::string &file_path, std::FILE *file, const import_export::CopyParams &copy_params, MetadataScanMultiThreadingParams &multi_threading_params)
 
 foreign_storage_cache_ (std::make_unique< ForeignStorageCache >(gfm, foreign_cache_entry_limit))
 
void validate_non_foreign_table_write (const TableDescriptor *table_descriptor)
 

Typedef Documentation

using foreign_storage::FileRegions = typedef std::vector<FileRegion>

Definition at line 52 of file CsvDataWrapper.h.

Function Documentation

void foreign_storage::add_end_of_line_if_needed ( size_t &  read_size,
const size_t  buffer_size,
char *  buffer,
const char  line_delim 
)

Adds an end of line character (specified by the line_delim parameter) to provided buffer, if this is the last read buffer and if the buffer does not already end with an end of line character. This allows for appropriate parsing by the csv_file_buffer_parser utility functions, which expect the end of rows to be indicated by end of line characters in the buffer.

Definition at line 213 of file CsvDataWrapper.cpp.

Referenced by dispatch_metadata_scan_requests(), and parse_file_regions().

216  {
217  if (read_size > 0 && read_size < buffer_size && buffer[read_size - 1] != line_delim) {
218  buffer[read_size] = line_delim;
219  read_size++;
220  }
221 }

+ Here is the caller graph for this function:

void foreign_storage::add_file_region ( std::map< int, FileRegions > &  fragment_id_to_file_regions_map,
std::mutex &  file_region_mutex,
int  fragment_id,
size_t  first_row_index,
const csv_file_buffer_parser::ParseBufferResult &  result 
)

Creates a new file region based on metadata from parsed CSV file buffers and adds the new region to the fragment id to file regions map.

Definition at line 490 of file CsvDataWrapper.cpp.

References foreign_storage::FileRegion::first_row_file_offset, foreign_storage::FileRegion::first_row_index, foreign_storage::FileRegion::region_size, foreign_storage::FileRegion::row_count, foreign_storage::csv_file_buffer_parser::ParseBufferResult::row_count, and foreign_storage::csv_file_buffer_parser::ParseBufferResult::row_offsets.

Referenced by scan_metadata().

494  {
495  FileRegion file_region;
496  file_region.first_row_file_offset = result.row_offsets.front();
497  file_region.region_size = result.row_offsets.back() - file_region.first_row_file_offset;
498  file_region.first_row_index = first_row_index;
499  file_region.row_count = result.row_count;
500 
501  {
502  std::lock_guard<std::mutex> lock(file_region_mutex);
503  fragment_id_to_file_regions_map[fragment_id].emplace_back(file_region);
504  }
505 }

+ Here is the caller graph for this function:

void foreign_storage::add_request_to_pool ( MetadataScanMultiThreadingParams &  multi_threading_params,
csv_file_buffer_parser::ParseBufferRequest &  request 
)

Adds the request object for a processed request back to the request pool for reuse in subsequent requests.

Definition at line 598 of file CsvDataWrapper.cpp.

References foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::MetadataScanMultiThreadingParams::request_pool_condition, and foreign_storage::MetadataScanMultiThreadingParams::request_pool_mutex.

Referenced by scan_metadata().

599  {
600  std::unique_lock<std::mutex> completed_requests_queue_lock(
601  multi_threading_params.request_pool_mutex);
602  multi_threading_params.request_pool.emplace(std::move(request));
603  completed_requests_queue_lock.unlock();
604  multi_threading_params.request_pool_condition.notify_all();
605 }

+ Here is the caller graph for this function:

void foreign_storage::dispatch_metadata_scan_request ( MetadataScanMultiThreadingParams &  multi_threading_params,
csv_file_buffer_parser::ParseBufferRequest &  request 
)

Dispatches a new metadata scan request by adding the request to the pending requests queue to be consumed by a worker thread.

Definition at line 689 of file CsvDataWrapper.cpp.

References foreign_storage::MetadataScanMultiThreadingParams::pending_requests, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, and foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex.

Referenced by dispatch_metadata_scan_requests().

691  {
692  std::unique_lock<std::mutex> pending_requests_lock(
693  multi_threading_params.pending_requests_mutex);
694  multi_threading_params.pending_requests.emplace(std::move(request));
695  pending_requests_lock.unlock();
696  multi_threading_params.pending_requests_condition.notify_all();
697 }

+ Here is the caller graph for this function:

void foreign_storage::dispatch_metadata_scan_requests ( const size_t  buffer_size,
const std::string &  file_path,
std::FILE *  file,
const import_export::CopyParams copy_params,
MetadataScanMultiThreadingParams &  multi_threading_params 
)

Reads from a CSV file iteratively and dispatches metadata scan requests that are processed by worker threads.

Definition at line 703 of file CsvDataWrapper.cpp.

References add_end_of_line_if_needed(), foreign_storage::MetadataScanMultiThreadingParams::continue_processing, dispatch_metadata_scan_request(), import_export::delimited_parser::find_end(), get_offset_after_header(), get_request_from_pool(), import_export::CopyParams::line_delim, foreign_storage::MetadataScanMultiThreadingParams::pending_requests, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, and foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex.

Referenced by foreign_storage::CsvDataWrapper::populateMetadataForChunkKeyPrefix().

708  {
709  auto residual_buffer = std::make_unique<char[]>(buffer_size);
710  size_t residual_buffer_size = 0;
711  size_t current_file_offset = get_offset_after_header(file_path, copy_params);
712  size_t first_row_index_in_buffer = 0;
713  fseek(file, current_file_offset, SEEK_SET);
714 
715  while (!feof(file)) {
716  auto request = get_request_from_pool(multi_threading_params);
717  if (residual_buffer_size > 0) {
718  memcpy(request.buffer.get(), residual_buffer.get(), residual_buffer_size);
719  }
720  size_t size = residual_buffer_size;
721  size += fread(request.buffer.get() + residual_buffer_size,
722  1,
723  buffer_size - residual_buffer_size,
724  file);
726  size, buffer_size, request.buffer.get(), copy_params.line_delim);
727 
728  unsigned int num_rows_in_buffer = 0;
730  request.buffer.get(), size, copy_params, num_rows_in_buffer);
731  request.first_row_index = first_row_index_in_buffer;
732  request.file_offset = current_file_offset;
733  request.buffer_row_count = num_rows_in_buffer;
734 
735  residual_buffer_size = size - request.end_pos;
736  if (residual_buffer_size > 0) {
737  memcpy(residual_buffer.get(),
738  request.buffer.get() + request.end_pos,
739  residual_buffer_size);
740  }
741 
742  current_file_offset += request.end_pos;
743  first_row_index_in_buffer += num_rows_in_buffer;
744 
745  dispatch_metadata_scan_request(multi_threading_params, request);
746  }
747 
748  std::unique_lock<std::mutex> pending_requests_queue_lock(
749  multi_threading_params.pending_requests_mutex);
750  multi_threading_params.pending_requests_condition.wait(
751  pending_requests_queue_lock, [&multi_threading_params] {
752  return multi_threading_params.pending_requests.empty();
753  });
754  multi_threading_params.continue_processing = false;
755  pending_requests_queue_lock.unlock();
756  multi_threading_params.pending_requests_condition.notify_all();
757 }
size_t get_offset_after_header(const std::string &file_path, const import_export::CopyParams &copy_params)
void dispatch_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
size_t find_end(const char *buffer, size_t size, const import_export::CopyParams &copy_params, unsigned int &num_rows_this_buffer)
Finds the closest possible row ending to the end of the given buffer.
csv_file_buffer_parser::ParseBufferRequest get_request_from_pool(MetadataScanMultiThreadingParams &multi_threading_params)
void add_end_of_line_if_needed(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

foreign_storage::foreign_storage_cache_ ( std::make_unique< ForeignStorageCache >  gfm, foreign_cache_entry_limit)

Definition at line 32 of file ForeignStorageMgr.cpp.

33  {}
size_t foreign_storage::get_buffer_size ( const import_export::CopyParams copy_params,
const size_t  file_size 
)

Gets the appropriate buffer size to be used when processing CSV file(s).

Definition at line 300 of file CsvDataWrapper.cpp.

References import_export::CopyParams::buffer_size.

Referenced by foreign_storage::CsvDataWrapper::populateChunk(), and foreign_storage::CsvDataWrapper::populateMetadataForChunkKeyPrefix().

301  {
302  size_t buffer_size = copy_params.buffer_size;
303  if (file_size < buffer_size) {
304  buffer_size = file_size + 1; // +1 for end of line character, if missing
305  }
306  return buffer_size;
307 }

+ Here is the caller graph for this function:

std::optional<csv_file_buffer_parser::ParseBufferRequest> foreign_storage::get_next_metadata_scan_request ( MetadataScanMultiThreadingParams &  multi_threading_params)

Gets the next metadata scan request object from the pending requests queue. A null optional is returned if there are no further requests to be processed.

Definition at line 467 of file CsvDataWrapper.cpp.

References foreign_storage::MetadataScanMultiThreadingParams::continue_processing, foreign_storage::MetadataScanMultiThreadingParams::pending_requests, foreign_storage::MetadataScanMultiThreadingParams::pending_requests_condition, and foreign_storage::MetadataScanMultiThreadingParams::pending_requests_mutex.

Referenced by scan_metadata().

468  {
469  std::unique_lock<std::mutex> pending_requests_lock(
470  multi_threading_params.pending_requests_mutex);
471  multi_threading_params.pending_requests_condition.wait(
472  pending_requests_lock, [&multi_threading_params] {
473  return !multi_threading_params.pending_requests.empty() ||
474  !multi_threading_params.continue_processing;
475  });
476  if (multi_threading_params.pending_requests.empty()) {
477  return {};
478  }
479  auto request = std::move(multi_threading_params.pending_requests.front());
480  multi_threading_params.pending_requests.pop();
481  pending_requests_lock.unlock();
482  multi_threading_params.pending_requests_condition.notify_all();
483  return std::move(request);
484 }

+ Here is the caller graph for this function:

size_t foreign_storage::get_offset_after_header ( const std::string &  file_path,
const import_export::CopyParams copy_params 
)

Gets the byte offset in a CSV file after skipping the header (if present).

Definition at line 653 of file CsvDataWrapper.cpp.

References CHECK(), import_export::CopyParams::has_header, import_export::CopyParams::line_delim, and import_export::NO_HEADER.

Referenced by dispatch_metadata_scan_requests().

654  {
655  size_t file_offset = 0;
656  const auto& header_param = copy_params.has_header;
657  if (header_param != import_export::ImportHeaderRow::NO_HEADER) {
658  std::ifstream file{file_path};
659  CHECK(file.good());
660  std::string line;
661  std::getline(file, line, copy_params.line_delim);
662  file.close();
663  file_offset = line.size() + 1;
664  }
665  return file_offset;
666 }
ImportHeaderRow has_header
Definition: CopyParams.h:48
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

csv_file_buffer_parser::ParseBufferRequest foreign_storage::get_request_from_pool ( MetadataScanMultiThreadingParams &  multi_threading_params)

Gets a request from the metadata scan request pool.

Definition at line 671 of file CsvDataWrapper.cpp.

References CHECK(), foreign_storage::MetadataScanMultiThreadingParams::request_pool, foreign_storage::MetadataScanMultiThreadingParams::request_pool_condition, and foreign_storage::MetadataScanMultiThreadingParams::request_pool_mutex.

Referenced by dispatch_metadata_scan_requests().

672  {
673  std::unique_lock<std::mutex> request_pool_lock(
674  multi_threading_params.request_pool_mutex);
675  multi_threading_params.request_pool_condition.wait(
676  request_pool_lock,
677  [&multi_threading_params] { return !multi_threading_params.request_pool.empty(); });
678  auto request = std::move(multi_threading_params.request_pool.front());
679  multi_threading_params.request_pool.pop();
680  request_pool_lock.unlock();
681  CHECK(request.buffer);
682  return request;
683 }
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::get_thread_count ( const import_export::CopyParams copy_params,
const size_t  file_size,
const size_t  buffer_size 
)

Gets the appropriate number of threads to be used for concurrent processing within the data wrapper.

Definition at line 313 of file CsvDataWrapper.cpp.

References CHECK(), and import_export::CopyParams::threads.

Referenced by foreign_storage::CsvDataWrapper::populateChunk(), and foreign_storage::CsvDataWrapper::populateMetadataForChunkKeyPrefix().

315  {
316  size_t thread_count = copy_params.threads;
317  if (thread_count == 0) {
318  thread_count = std::thread::hardware_concurrency();
319  }
320  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
321  if (num_buffers_in_file < thread_count) {
322  thread_count = num_buffers_in_file;
323  }
324  CHECK(thread_count);
325  return thread_count;
326 }
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::get_var_length_data_block_size ( DataBlockPtr  data_block,
SQLTypeInfo  sql_type_info 
)

Get the total number of bytes in the given data block for a variable length column.

Definition at line 511 of file CsvDataWrapper.cpp.

References DataBlockPtr::arraysPtr, CHECK(), SQLTypeInfo::is_array(), SQLTypeInfo::is_geometry(), SQLTypeInfo::is_string(), SQLTypeInfo::is_varlen(), DataBlockPtr::stringsPtr, and UNREACHABLE.

Referenced by update_metadata().

512  {
513  CHECK(sql_type_info.is_varlen());
514  size_t byte_count = 0;
515  if (sql_type_info.is_string() || sql_type_info.is_geometry()) {
516  for (const auto& str : *data_block.stringsPtr) {
517  byte_count += str.length();
518  }
519  } else if (sql_type_info.is_array()) {
520  for (const auto& array : *data_block.arraysPtr) {
521  byte_count += array.length;
522  }
523  } else {
524  UNREACHABLE();
525  }
526  return byte_count;
527 }
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:149
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:150
bool is_varlen() const
Definition: sqltypes.h:430
#define UNREACHABLE()
Definition: Logger.h:241
CHECK(cgen_state)
bool is_geometry() const
Definition: sqltypes.h:427
bool is_string() const
Definition: sqltypes.h:415
bool is_array() const
Definition: sqltypes.h:423

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::initialize_import_buffers ( const std::list< const ColumnDescriptor * > &  columns,
std::shared_ptr< Catalog_Namespace::Catalog catalog,
std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers 
)

Initializes import buffers for each of the provided columns.

Definition at line 331 of file CsvDataWrapper.cpp.

References IS_STRING, and kENCODING_DICT.

Referenced by foreign_storage::CsvDataWrapper::populateChunk(), and foreign_storage::CsvDataWrapper::populateMetadataForChunkKeyPrefix().

334  {
335  for (const auto column : columns) {
336  StringDictionary* string_dictionary = nullptr;
337  if (column->columnType.is_dict_encoded_string() ||
338  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
339  column->columnType.get_compression() == kENCODING_DICT)) {
340  auto dict_descriptor =
341  catalog->getMetadataForDictUnlocked(column->columnType.get_comp_param(), true);
342  string_dictionary = dict_descriptor->stringDict.get();
343  }
344  import_buffers.emplace_back(
345  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
346  }
347 }
#define IS_STRING(T)
Definition: sqltypes.h:172

+ Here is the caller graph for this function:

std::FILE* foreign_storage::open_file ( const std::string &  file_path)

Opens a file, at provided file path, as a binary file in read mode. An exception is thrown if attempt to open the file fails.

Definition at line 288 of file CsvDataWrapper.cpp.

Referenced by foreign_storage::CsvDataWrapper::populateChunk(), and foreign_storage::CsvDataWrapper::populateMetadataForChunkKeyPrefix().

288  {
289  auto file = fopen(file_path.c_str(), "rb");
290  if (!file) {
291  throw std::runtime_error{"An error occurred when attempting to open file \"" +
292  file_path + "\". " + strerror(errno)};
293  }
294  return file;
295 }

+ Here is the caller graph for this function:

ParseFileRegionResult foreign_storage::parse_file_regions ( const FileRegions &  file_regions,
const size_t  start_index,
const size_t  end_index,
FILE *  file,
std::mutex &  file_access_mutex,
csv_file_buffer_parser::ParseBufferRequest &  parse_file_request,
const ChunkKey chunk_key 
)

Parses a set of file regions given a handle to the file and range of indexes for the file regions to be parsed.

Definition at line 240 of file CsvDataWrapper.cpp.

References add_end_of_line_if_needed(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::begin_pos, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::buffer_size, CHECK(), CHECK_EQ, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::copy_params, foreign_storage::csv_file_buffer_parser::ParseBufferResult::data_blocks, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::end_pos, foreign_storage::ParseFileRegionResult::file_offset, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::file_offset, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::first_row_index, import_export::CopyParams::line_delim, foreign_storage::csv_file_buffer_parser::parse_buffer(), foreign_storage::csv_file_buffer_parser::ParseBufferRequest::process_row_count, run_benchmark_import::result, and foreign_storage::csv_file_buffer_parser::ParseBufferResult::row_count.

Referenced by foreign_storage::CsvDataWrapper::populateChunk().

247  {
248  ParseFileRegionResult load_file_region_result{};
249  load_file_region_result.file_offset = file_regions[start_index].first_row_file_offset;
250  load_file_region_result.row_count = 0;
251 
252  csv_file_buffer_parser::ParseBufferResult result;
253  for (size_t i = start_index; i <= end_index; i++) {
254  CHECK(file_regions[i].region_size <= parse_file_request.buffer_size);
255  size_t read_size;
256  {
257  std::lock_guard<std::mutex> lock(file_access_mutex);
258  fseek(file, file_regions[i].first_row_file_offset, SEEK_SET);
259  read_size =
260  fread(parse_file_request.buffer.get(), 1, file_regions[i].region_size, file);
261  }
262  add_end_of_line_if_needed(read_size,
263  parse_file_request.buffer_size,
264  parse_file_request.buffer.get(),
265  parse_file_request.copy_params.line_delim);
266 
267  CHECK_EQ(file_regions[i].region_size, read_size);
268  parse_file_request.begin_pos = 0;
269  parse_file_request.end_pos = file_regions[i].region_size;
270  parse_file_request.first_row_index = file_regions[i].first_row_index;
271  parse_file_request.file_offset = file_regions[i].first_row_file_offset;
272  parse_file_request.process_row_count = file_regions[i].row_count;
273 
274  result = parse_buffer(parse_file_request);
275  CHECK(result.data_blocks.find(chunk_key[2]) != result.data_blocks.end());
276  CHECK_EQ(file_regions[i].row_count, result.row_count);
277  load_file_region_result.row_count += result.row_count;
278  }
279  CHECK(result.data_blocks.find(chunk_key[2]) != result.data_blocks.end());
280  load_file_region_result.data_blocks = result.data_blocks.find(chunk_key[2])->second;
281  return load_file_region_result;
282 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
ParseBufferResult parse_buffer(ParseBufferRequest &request)
CHECK(cgen_state)
void add_end_of_line_if_needed(size_t &read_size, const size_t buffer_size, char *buffer, const char line_delim)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector<size_t> foreign_storage::partition_by_fragment ( const size_t  start_row_index,
const size_t  max_fragment_size,
const size_t  buffer_row_count 
)

Given a start row index, maximum fragment size, and number of rows in a buffer, this function returns a vector indicating how the rows in the buffer should be partitioned in order to fill up available fragment slots while staying within the capacity of fragments.

Definition at line 415 of file CsvDataWrapper.cpp.

References CHECK().

Referenced by scan_metadata().

417  {
418  CHECK(buffer_row_count > 0);
419  std::vector<size_t> partitions{};
420  size_t remaining_rows_in_last_fragment;
421  if (start_row_index % max_fragment_size == 0) {
422  remaining_rows_in_last_fragment = 0;
423  } else {
424  remaining_rows_in_last_fragment =
425  max_fragment_size - (start_row_index % max_fragment_size);
426  }
427  if (buffer_row_count <= remaining_rows_in_last_fragment) {
428  partitions.emplace_back(buffer_row_count);
429  } else {
430  if (remaining_rows_in_last_fragment > 0) {
431  partitions.emplace_back(remaining_rows_in_last_fragment);
432  }
433  size_t remaining_buffer_row_count =
434  buffer_row_count - remaining_rows_in_last_fragment;
435  while (remaining_buffer_row_count > 0) {
436  partitions.emplace_back(
437  std::min<size_t>(remaining_buffer_row_count, max_fragment_size));
438  remaining_buffer_row_count -= partitions.back();
439  }
440  }
441  return partitions;
442 }
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::scan_metadata ( MetadataScanMultiThreadingParams &  multi_threading_params,
std::map< int, FileRegions > &  fragment_id_to_file_regions_map,
std::mutex &  file_region_mutex 
)

Consumes and processes metadata scan requests from a pending requests queue and updates existing metadata objects based on newly scanned metadata.

Definition at line 611 of file CsvDataWrapper.cpp.

References add_file_region(), add_request_to_pool(), get_next_metadata_scan_request(), foreign_storage::csv_file_buffer_parser::parse_buffer(), partition_by_fragment(), run_benchmark_import::result, and update_metadata().

Referenced by foreign_storage::CsvDataWrapper::populateMetadataForChunkKeyPrefix().

613  {
614  std::map<int, const ColumnDescriptor*> column_by_id{};
615  while (true) {
616  auto request_opt = get_next_metadata_scan_request(multi_threading_params);
617  if (!request_opt.has_value()) {
618  break;
619  }
620  auto& request = request_opt.value();
621  if (column_by_id.empty()) {
622  for (const auto column : request.columns) {
623  column_by_id[column->columnId] = column;
624  }
625  }
626  auto partitions = partition_by_fragment(
627  request.first_row_index, request.max_fragment_rows, request.buffer_row_count);
628  request.begin_pos = 0;
629  size_t row_index = request.first_row_index;
630  for (const auto partition : partitions) {
631  request.process_row_count = partition;
632  for (const auto& import_buffer : request.import_buffers) {
633  import_buffer->clear();
634  }
635  auto result = parse_buffer(request);
636  int fragment_id = row_index / request.max_fragment_rows;
637  add_file_region(fragment_id_to_file_regions_map,
638  file_region_mutex,
639  fragment_id,
640  request.first_row_index,
641  result);
642  update_metadata(multi_threading_params, fragment_id, request, result, column_by_id);
643  row_index += result.row_count;
644  request.begin_pos = result.row_offsets.back() - request.file_offset;
645  }
646  add_request_to_pool(multi_threading_params, request);
647  }
648 }
void add_request_to_pool(MetadataScanMultiThreadingParams &multi_threading_params, csv_file_buffer_parser::ParseBufferRequest &request)
ParseBufferResult parse_buffer(ParseBufferRequest &request)
std::vector< size_t > partition_by_fragment(const size_t start_row_index, const size_t max_fragment_size, const size_t buffer_row_count)
void add_file_region(std::map< int, FileRegions > &fragment_id_to_file_regions_map, std::mutex &file_region_mutex, int fragment_id, size_t first_row_index, const csv_file_buffer_parser::ParseBufferResult &result)
std::optional< csv_file_buffer_parser::ParseBufferRequest > get_next_metadata_scan_request(MetadataScanMultiThreadingParams &multi_threading_params)
void update_metadata(MetadataScanMultiThreadingParams &multi_threading_params, int fragment_id, const csv_file_buffer_parser::ParseBufferRequest &request, const csv_file_buffer_parser::ParseBufferResult &result, std::map< int, const ColumnDescriptor * > &column_by_id)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::update_metadata ( MetadataScanMultiThreadingParams &  multi_threading_params,
int  fragment_id,
const csv_file_buffer_parser::ParseBufferRequest &  request,
const csv_file_buffer_parser::ParseBufferResult &  result,
std::map< int, const ColumnDescriptor * > &  column_by_id 
)

Updates metadata encapsulated in encoders for all table columns given new data blocks gotten from parsing a new set of rows in a CSV file buffer.

Definition at line 550 of file CsvDataWrapper.cpp.

References foreign_storage::MetadataScanMultiThreadingParams::chunk_byte_count, foreign_storage::MetadataScanMultiThreadingParams::chunk_byte_count_mutex, foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers, foreign_storage::MetadataScanMultiThreadingParams::chunk_encoder_buffers_mutex, foreign_storage::csv_file_buffer_parser::ParseBufferResult::data_blocks, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::db_id, get_var_length_data_block_size(), foreign_storage::csv_file_buffer_parser::ParseBufferResult::row_count, foreign_storage::csv_file_buffer_parser::ParseBufferRequest::table_id, and update_stats().

Referenced by scan_metadata().

554  {
555  for (auto& [column_id, data_block] : result.data_blocks) {
556  ChunkKey chunk_key{request.db_id, request.table_id, column_id, fragment_id};
557  const auto column = column_by_id[column_id];
558  size_t byte_count;
559  if (column->columnType.is_varlen() && !column->columnType.is_fixlen_array()) {
560  chunk_key.emplace_back(1);
561  byte_count = get_var_length_data_block_size(data_block, column->columnType);
562  } else {
563  byte_count = column->columnType.get_logical_size() * result.row_count;
564  }
565 
566  {
567  std::lock_guard<std::mutex> lock(multi_threading_params.chunk_byte_count_mutex);
568  multi_threading_params.chunk_byte_count[chunk_key] += byte_count;
569  }
570 
571  {
572  std::lock_guard<std::mutex> lock(
573  multi_threading_params.chunk_encoder_buffers_mutex);
574  if (multi_threading_params.chunk_encoder_buffers.find(chunk_key) ==
575  multi_threading_params.chunk_encoder_buffers.end()) {
576  multi_threading_params.chunk_encoder_buffers[chunk_key] =
577  std::make_unique<ForeignStorageBuffer>();
578  multi_threading_params.chunk_encoder_buffers[chunk_key]->initEncoder(
579  column->columnType);
580  }
581  update_stats(multi_threading_params.chunk_encoder_buffers[chunk_key]->encoder.get(),
582  column->columnType,
583  data_block,
584  result.row_count);
585  size_t num_elements = multi_threading_params.chunk_encoder_buffers[chunk_key]
586  ->encoder->getNumElems() +
587  result.row_count;
588  multi_threading_params.chunk_encoder_buffers[chunk_key]->encoder->setNumElems(
589  num_elements);
590  }
591  }
592 }
std::vector< int > ChunkKey
Definition: types.h:35
size_t get_var_length_data_block_size(DataBlockPtr data_block, SQLTypeInfo sql_type_info)
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::update_stats ( Encoder encoder,
const SQLTypeInfo column_type,
DataBlockPtr  data_block,
const size_t  row_count 
)

Updates the statistics metadata encapsulated in the encoder given new data in a data block.

Definition at line 533 of file CsvDataWrapper.cpp.

References DataBlockPtr::arraysPtr, SQLTypeInfo::is_array(), SQLTypeInfo::is_varlen(), DataBlockPtr::numbersPtr, DataBlockPtr::stringsPtr, and Encoder::updateStats().

Referenced by update_metadata(), and Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata().

536  {
537  if (column_type.is_array()) {
538  encoder->updateStats(data_block.arraysPtr, 0, row_count);
539  } else if (!column_type.is_varlen()) {
540  encoder->updateStats(data_block.numbersPtr, row_count);
541  } else {
542  encoder->updateStats(data_block.stringsPtr, 0, row_count);
543  }
544 }
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:149
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:150
bool is_varlen() const
Definition: sqltypes.h:430
int8_t * numbersPtr
Definition: sqltypes.h:148
virtual void updateStats(const int64_t val, const bool is_null)=0
bool is_array() const
Definition: sqltypes.h:423

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::validate_non_foreign_table_write ( const TableDescriptor table_descriptor)
inline

Definition at line 22 of file FsiUtils.h.

References StorageType::FOREIGN_TABLE, and TableDescriptor::storageType.

Referenced by Parser::InsertStmt::analyze(), Parser::InsertIntoTableAsSelectStmt::populateData(), and RelModify::RelModify().

22  {
23  if (table_descriptor && table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
24  throw std::runtime_error{
25  "DELETE, INSERT, OR UPDATE commands are not supported for foreign tables."};
26  }
27 }
std::string storageType
static constexpr char const * FOREIGN_TABLE

+ Here is the caller graph for this function: