OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp} Namespace Reference

Functions

void throw_fragment_id_out_of_bounds_error (const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
 
std::set< const
ColumnDescriptor * > 
get_columns (const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
 
bool skip_metadata_scan (const ColumnDescriptor *column)
 
void throw_unexpected_number_of_items (const size_t num_expected, const size_t num_loaded, const std::string &item_type, const std::string &foreign_table_name)
 
size_t get_buffer_size (const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
 
size_t get_buffer_size (const FileRegions &file_regions)
 
size_t get_thread_count (const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
 
size_t get_thread_count (const import_export::CopyParams &copy_params, const FileRegions &file_regions)
 
void resize_delete_buffer (AbstractBuffer *delete_buffer, const size_t chunk_element_count)
 
bool no_deferred_requests (MetadataScanMultiThreadingParams &multi_threading_params)
 
bool is_file_scan_finished (const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
 
foreign_storage::ForeignStorageCacheget_cache_if_enabled (std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
 
void add_placeholder_metadata (const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
 
void initialize_non_append_mode_scan (const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
 

Function Documentation

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::add_placeholder_metadata ( const ColumnDescriptor column,
const ForeignTable *  foreign_table,
const int  db_id,
const size_t  start_row,
const size_t  total_num_rows,
std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &  chunk_metadata_map 
)

Definition at line 1299 of file AbstractTextFileDataWrapper.cpp.

References CHUNK_KEY_FRAGMENT_IDX, ColumnDescriptor::columnId, ColumnDescriptor::columnType, foreign_storage::get_placeholder_metadata(), SQLTypeInfo::is_varlen_indeed(), TableDescriptor::maxFragRows, and TableDescriptor::tableId.

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata().

1305  {
1306  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
1307  if (column->columnType.is_varlen_indeed()) {
1308  chunk_key.emplace_back(1);
1309  }
1310 
1311  // Create placeholder metadata for every fragment touched by this scan
1312  int start_fragment = start_row / foreign_table->maxFragRows;
1313  int end_fragment{0};
1314  if (total_num_rows > 0) {
1315  end_fragment = (total_num_rows - 1) / foreign_table->maxFragRows;
1316  }
1317  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
1318  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
1319  (fragment_id + 1)) > total_num_rows)
1320  ? total_num_rows % foreign_table->maxFragRows
1321  : foreign_table->maxFragRows;
1322 
1323  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
1324  chunk_metadata_map[chunk_key] =
1325  get_placeholder_metadata(column->columnType, num_elements);
1326  }
1327 }
std::vector< int > ChunkKey
Definition: types.h:36
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
Definition: CsvShared.cpp:254
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:635

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size ( const import_export::CopyParams copy_params,
const bool  size_known,
const size_t  file_size 
)

Gets the appropriate buffer size to be used when processing file(s).

Definition at line 264 of file AbstractTextFileDataWrapper.cpp.

References import_export::CopyParams::buffer_size.

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

266  {
267  size_t buffer_size = copy_params.buffer_size;
268  if (size_known && file_size < buffer_size) {
269  buffer_size = file_size + 1; // +1 for end of line character, if missing
270  }
271  return buffer_size;
272 }
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size ( const FileRegions &  file_regions)

Definition at line 274 of file AbstractTextFileDataWrapper.cpp.

References CHECK.

274  {
275  size_t buffer_size = 0;
276  for (const auto& file_region : file_regions) {
277  buffer_size = std::max(buffer_size, file_region.region_size);
278  }
279  CHECK(buffer_size);
280  return buffer_size;
281 }
#define CHECK(condition)
Definition: Logger.h:291
foreign_storage::ForeignStorageCache* foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_cache_if_enabled ( std::shared_ptr< Catalog_Namespace::Catalog > &  catalog,
const bool  disable_cache 
)

Definition at line 593 of file AbstractTextFileDataWrapper.cpp.

Referenced by foreign_storage::cache_blocks().

595  {
596  if (!disable_cache && catalog->getDataMgr()
597  .getPersistentStorageMgr()
598  ->getDiskCacheConfig()
599  .isEnabledForFSI()) {
600  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
601  } else {
602  return nullptr;
603  }
604 }

+ Here is the caller graph for this function:

std::set<const ColumnDescriptor*> foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_columns ( const ChunkToBufferMap &  buffers,
const Catalog_Namespace::Catalog catalog,
const int32_t  table_id,
const int  fragment_id 
)

Definition at line 83 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, and Catalog_Namespace::Catalog::getMetadataForColumn().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkBuffers().

86  {
87  CHECK(!buffers.empty());
88  std::set<const ColumnDescriptor*> columns;
89  for (const auto& entry : buffers) {
90  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
91  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
92  const auto column = catalog.getMetadataForColumn(table_id, column_id);
93  columns.emplace(column);
94  }
95  return columns;
96 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:291
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count ( const import_export::CopyParams copy_params,
const bool  size_known,
const size_t  file_size,
const size_t  buffer_size 
)

Gets the appropriate number of threads to be used for concurrent processing within the data wrapper.

Definition at line 287 of file AbstractTextFileDataWrapper.cpp.

References CHECK_GT, and import_export::CopyParams::threads.

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

290  {
291  size_t thread_count = copy_params.threads;
292  if (thread_count == 0) {
293  thread_count = std::thread::hardware_concurrency();
294  }
295  if (size_known && file_size > 0) {
296  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
297  if (num_buffers_in_file < thread_count) {
298  thread_count = num_buffers_in_file;
299  }
300  }
301  CHECK_GT(thread_count, static_cast<size_t>(0));
302  return thread_count;
303 }
#define CHECK_GT(x, y)
Definition: Logger.h:305
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count ( const import_export::CopyParams copy_params,
const FileRegions &  file_regions 
)

Definition at line 305 of file AbstractTextFileDataWrapper.cpp.

References CHECK_GT, and import_export::CopyParams::threads.

306  {
307  size_t thread_count = copy_params.threads;
308  if (thread_count == 0) {
309  thread_count =
310  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
311  }
312  CHECK_GT(thread_count, static_cast<size_t>(0));
313  return thread_count;
314 }
#define CHECK_GT(x, y)
Definition: Logger.h:305
void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan ( const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &  chunk_metadata_map,
const std::map< int, FileRegions > &  fragment_id_to_file_regions_map,
const foreign_storage::OptionsMap server_options,
std::unique_ptr< FileReader > &  file_reader,
const std::string &  file_path,
const import_export::CopyParams copy_params,
const shared::FilePathOptions file_path_options,
const std::optional< size_t > &  max_file_count,
const foreign_storage::ForeignTable foreign_table,
const foreign_storage::UserMapping user_mapping,
const foreign_storage::TextFileBufferParser parser,
std::function< std::string()>  get_s3_key,
size_t &  num_rows,
size_t &  append_start_offset 
)

Definition at line 1329 of file AbstractTextFileDataWrapper.cpp.

References CHECK, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateFiles().

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), and foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata().

1343  {
1344  // Should only be called once for non-append tables
1345  CHECK(chunk_metadata_map.empty());
1346  CHECK(fragment_id_to_file_regions_map.empty());
1348  ->second ==
1350  file_reader = std::make_unique<LocalMultiFileReader>(
1351  file_path, copy_params, file_path_options, max_file_count);
1352  } else {
1353  UNREACHABLE();
1354  }
1355  parser.validateFiles(file_reader.get(), foreign_table);
1356  num_rows = 0;
1357  append_start_offset = 0;
1358 }
virtual void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const =0
#define UNREACHABLE()
Definition: Logger.h:338
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished ( const FileReader *  file_reader,
MetadataScanMultiThreadingParams &  multi_threading_params 
)

Definition at line 331 of file AbstractTextFileDataWrapper.cpp.

References CHECK, foreign_storage::FileReader::isScanFinished(), and no_deferred_requests().

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

332  {
333  CHECK(file_reader);
334  return file_reader->isScanFinished() && no_deferred_requests(multi_threading_params);
335 }
bool no_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::no_deferred_requests ( MetadataScanMultiThreadingParams &  multi_threading_params)

Definition at line 325 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::MetadataScanMultiThreadingParams::deferred_requests, and foreign_storage::MetadataScanMultiThreadingParams::deferred_requests_mutex.

Referenced by foreign_storage::dispatch_scan_requests(), and is_file_scan_finished().

325  {
326  std::unique_lock<std::mutex> deferred_requests_lock(
327  multi_threading_params.deferred_requests_mutex);
328  return multi_threading_params.deferred_requests.empty();
329 }

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::resize_delete_buffer ( AbstractBuffer delete_buffer,
const size_t  chunk_element_count 
)

Definition at line 316 of file AbstractTextFileDataWrapper.cpp.

References Data_Namespace::AbstractBuffer::append(), and Data_Namespace::AbstractBuffer::size().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunks(), and foreign_storage::update_delete_buffer().

317  {
318  if (delete_buffer->size() < chunk_element_count) {
319  auto remaining_rows = chunk_element_count - delete_buffer->size();
320  std::vector<int8_t> data(remaining_rows, false);
321  delete_buffer->append(data.data(), remaining_rows);
322  }
323 }
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan ( const ColumnDescriptor column)

Definition at line 98 of file AbstractTextFileDataWrapper.cpp.

References ColumnDescriptor::columnType, and SQLTypeInfo::is_dict_encoded_type().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::updateMetadata().

98  {
99  return column->columnType.is_dict_encoded_type();
100 }
bool is_dict_encoded_type() const
Definition: sqltypes.h:653
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_fragment_id_out_of_bounds_error ( const TableDescriptor table,
const int32_t  fragment_id,
const int32_t  max_fragment_id 
)

Definition at line 73 of file AbstractTextFileDataWrapper.cpp.

References TableDescriptor::tableName, and to_string().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunks().

75  {
76  throw RequestedFragmentIdOutOfBoundsException{
77  "Attempting to populate fragment id " + std::to_string(fragment_id) +
78  " for foreign table " + table->tableName +
79  " which is greater than the maximum fragment id of " +
80  std::to_string(max_fragment_id) + "."};
81 }
std::string tableName
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_unexpected_number_of_items ( const size_t  num_expected,
const size_t  num_loaded,
const std::string &  item_type,
const std::string &  foreign_table_name 
)

Definition at line 196 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::throw_unexpected_number_of_items().

199  {
200  try {
202  num_expected, num_loaded, item_type);
203  } catch (const foreign_storage::ForeignStorageException& except) {
205  std::string(except.what()) + " Foreign table: " + foreign_table_name);
206  }
207 }
void throw_unexpected_number_of_items(const size_t &num_expected, const size_t &num_loaded, const std::string &item_type)

+ Here is the call graph for this function: