OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp} Namespace Reference

Functions

void throw_fragment_id_out_of_bounds_error (const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
 
std::set< const
ColumnDescriptor * > 
get_columns (const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
 
bool skip_metadata_scan (const ColumnDescriptor *column)
 
void throw_unexpected_number_of_items (const size_t num_expected, const size_t num_loaded, const std::string &item_type, const std::string &foreign_table_name)
 
size_t get_buffer_size (const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
 
size_t get_buffer_size (const FileRegions &file_regions)
 
size_t get_thread_count (const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
 
size_t get_thread_count (const import_export::CopyParams &copy_params, const FileRegions &file_regions)
 
void resize_delete_buffer (AbstractBuffer *delete_buffer, const size_t chunk_element_count)
 
bool no_deferred_requests (MetadataScanMultiThreadingParams &multi_threading_params)
 
bool is_file_scan_finished (const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
 
foreign_storage::ForeignStorageCacheget_cache_if_enabled (std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
 
void add_placeholder_metadata (const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
 
void initialize_non_append_mode_scan (const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
 

Function Documentation

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::add_placeholder_metadata ( const ColumnDescriptor column,
const ForeignTable *  foreign_table,
const int  db_id,
const size_t  start_row,
const size_t  total_num_rows,
std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &  chunk_metadata_map 
)

Definition at line 1289 of file AbstractTextFileDataWrapper.cpp.

References CHUNK_KEY_FRAGMENT_IDX, ColumnDescriptor::columnId, ColumnDescriptor::columnType, foreign_storage::get_placeholder_metadata(), SQLTypeInfo::is_varlen_indeed(), TableDescriptor::maxFragRows, and TableDescriptor::tableId.

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata().

1295  {
1296  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
1297  if (column->columnType.is_varlen_indeed()) {
1298  chunk_key.emplace_back(1);
1299  }
1300 
1301  // Create placeholder metadata for every fragment touched by this scan
1302  int start_fragment = start_row / foreign_table->maxFragRows;
1303  int end_fragment{0};
1304  if (total_num_rows > 0) {
1305  end_fragment = (total_num_rows - 1) / foreign_table->maxFragRows;
1306  }
1307  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
1308  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
1309  (fragment_id + 1)) > total_num_rows)
1310  ? total_num_rows % foreign_table->maxFragRows
1311  : foreign_table->maxFragRows;
1312 
1313  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
1314  chunk_metadata_map[chunk_key] =
1315  get_placeholder_metadata(column->columnType, num_elements);
1316  }
1317 }
std::vector< int > ChunkKey
Definition: types.h:36
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
Definition: CsvShared.cpp:254
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:622

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size ( const import_export::CopyParams copy_params,
const bool  size_known,
const size_t  file_size 
)

Gets the appropriate buffer size to be used when processing file(s).

Definition at line 265 of file AbstractTextFileDataWrapper.cpp.

References import_export::CopyParams::buffer_size.

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

267  {
268  size_t buffer_size = copy_params.buffer_size;
269  if (size_known && file_size < buffer_size) {
270  buffer_size = file_size + 1; // +1 for end of line character, if missing
271  }
272  return buffer_size;
273 }
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size ( const FileRegions &  file_regions)

Definition at line 275 of file AbstractTextFileDataWrapper.cpp.

References CHECK.

275  {
276  size_t buffer_size = 0;
277  for (const auto& file_region : file_regions) {
278  buffer_size = std::max(buffer_size, file_region.region_size);
279  }
280  CHECK(buffer_size);
281  return buffer_size;
282 }
#define CHECK(condition)
Definition: Logger.h:289
foreign_storage::ForeignStorageCache* foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_cache_if_enabled ( std::shared_ptr< Catalog_Namespace::Catalog > &  catalog,
const bool  disable_cache 
)

Definition at line 583 of file AbstractTextFileDataWrapper.cpp.

Referenced by foreign_storage::cache_blocks().

585  {
586  if (!disable_cache && catalog->getDataMgr()
587  .getPersistentStorageMgr()
588  ->getDiskCacheConfig()
589  .isEnabledForFSI()) {
590  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
591  } else {
592  return nullptr;
593  }
594 }

+ Here is the caller graph for this function:

std::set<const ColumnDescriptor*> foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_columns ( const ChunkToBufferMap &  buffers,
const Catalog_Namespace::Catalog catalog,
const int32_t  table_id,
const int  fragment_id 
)

Definition at line 84 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, and Catalog_Namespace::Catalog::getMetadataForColumn().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkBuffers().

87  {
88  CHECK(!buffers.empty());
89  std::set<const ColumnDescriptor*> columns;
90  for (const auto& entry : buffers) {
91  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
92  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
93  const auto column = catalog.getMetadataForColumn(table_id, column_id);
94  columns.emplace(column);
95  }
96  return columns;
97 }
#define CHECK_EQ(x, y)
Definition: Logger.h:297
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:289
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count ( const import_export::CopyParams copy_params,
const bool  size_known,
const size_t  file_size,
const size_t  buffer_size 
)

Gets the appropriate number of threads to be used for concurrent processing within the data wrapper.

Definition at line 288 of file AbstractTextFileDataWrapper.cpp.

References CHECK_GT, and import_export::CopyParams::threads.

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

291  {
292  size_t thread_count = copy_params.threads;
293  if (thread_count == 0) {
294  thread_count = std::thread::hardware_concurrency();
295  }
296  if (size_known && file_size > 0) {
297  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
298  if (num_buffers_in_file < thread_count) {
299  thread_count = num_buffers_in_file;
300  }
301  }
302  CHECK_GT(thread_count, static_cast<size_t>(0));
303  return thread_count;
304 }
#define CHECK_GT(x, y)
Definition: Logger.h:301
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count ( const import_export::CopyParams copy_params,
const FileRegions &  file_regions 
)

Definition at line 306 of file AbstractTextFileDataWrapper.cpp.

References CHECK_GT, and import_export::CopyParams::threads.

307  {
308  size_t thread_count = copy_params.threads;
309  if (thread_count == 0) {
310  thread_count =
311  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
312  }
313  CHECK_GT(thread_count, static_cast<size_t>(0));
314  return thread_count;
315 }
#define CHECK_GT(x, y)
Definition: Logger.h:301
void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan ( const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &  chunk_metadata_map,
const std::map< int, FileRegions > &  fragment_id_to_file_regions_map,
const foreign_storage::OptionsMap server_options,
std::unique_ptr< FileReader > &  file_reader,
const std::string &  file_path,
const import_export::CopyParams copy_params,
const shared::FilePathOptions file_path_options,
const std::optional< size_t > &  max_file_count,
const foreign_storage::ForeignTable foreign_table,
const foreign_storage::UserMapping user_mapping,
const foreign_storage::TextFileBufferParser parser,
std::function< std::string()>  get_s3_key,
size_t &  num_rows,
size_t &  append_start_offset 
)

Definition at line 1319 of file AbstractTextFileDataWrapper.cpp.

References CHECK, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateFiles().

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), and foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata().

1333  {
1334  // Should only be called once for non-append tables
1335  CHECK(chunk_metadata_map.empty());
1336  CHECK(fragment_id_to_file_regions_map.empty());
1338  ->second ==
1340  file_reader = std::make_unique<LocalMultiFileReader>(
1341  file_path, copy_params, file_path_options, max_file_count);
1342  } else {
1343  UNREACHABLE();
1344  }
1345  parser.validateFiles(file_reader.get(), foreign_table);
1346  num_rows = 0;
1347  append_start_offset = 0;
1348 }
virtual void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const =0
#define UNREACHABLE()
Definition: Logger.h:333
#define CHECK(condition)
Definition: Logger.h:289

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished ( const FileReader *  file_reader,
MetadataScanMultiThreadingParams &  multi_threading_params 
)

Definition at line 332 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::FileReader::isScanFinished(), and no_deferred_requests().

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

333  {
334  return file_reader->isScanFinished() && no_deferred_requests(multi_threading_params);
335 }
bool no_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::no_deferred_requests ( MetadataScanMultiThreadingParams &  multi_threading_params)

Definition at line 326 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::MetadataScanMultiThreadingParams::deferred_requests, and foreign_storage::MetadataScanMultiThreadingParams::deferred_requests_mutex.

Referenced by foreign_storage::dispatch_scan_requests(), and is_file_scan_finished().

326  {
327  std::unique_lock<std::mutex> deferred_requests_lock(
328  multi_threading_params.deferred_requests_mutex);
329  return multi_threading_params.deferred_requests.empty();
330 }

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::resize_delete_buffer ( AbstractBuffer delete_buffer,
const size_t  chunk_element_count 
)

Definition at line 317 of file AbstractTextFileDataWrapper.cpp.

References Data_Namespace::AbstractBuffer::append(), and Data_Namespace::AbstractBuffer::size().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunks(), and foreign_storage::update_delete_buffer().

318  {
319  if (delete_buffer->size() < chunk_element_count) {
320  auto remaining_rows = chunk_element_count - delete_buffer->size();
321  std::vector<int8_t> data(remaining_rows, false);
322  delete_buffer->append(data.data(), remaining_rows);
323  }
324 }
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan ( const ColumnDescriptor column)

Definition at line 99 of file AbstractTextFileDataWrapper.cpp.

References ColumnDescriptor::columnType, and SQLTypeInfo::is_dict_encoded_type().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::updateMetadata().

99  {
100  return column->columnType.is_dict_encoded_type();
101 }
bool is_dict_encoded_type() const
Definition: sqltypes.h:640
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_fragment_id_out_of_bounds_error ( const TableDescriptor table,
const int32_t  fragment_id,
const int32_t  max_fragment_id 
)

Definition at line 74 of file AbstractTextFileDataWrapper.cpp.

References TableDescriptor::tableName, and to_string().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunks().

76  {
77  throw RequestedFragmentIdOutOfBoundsException{
78  "Attempting to populate fragment id " + std::to_string(fragment_id) +
79  " for foreign table " + table->tableName +
80  " which is greater than the maximum fragment id of " +
81  std::to_string(max_fragment_id) + "."};
82 }
std::string tableName
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_unexpected_number_of_items ( const size_t  num_expected,
const size_t  num_loaded,
const std::string &  item_type,
const std::string &  foreign_table_name 
)

Definition at line 197 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::throw_unexpected_number_of_items().

200  {
201  try {
203  num_expected, num_loaded, item_type);
204  } catch (const foreign_storage::ForeignStorageException& except) {
206  std::string(except.what()) + " Foreign table: " + foreign_table_name);
207  }
208 }
void throw_unexpected_number_of_items(const size_t &num_expected, const size_t &num_loaded, const std::string &item_type)

+ Here is the call graph for this function: