OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp} Namespace Reference

Functions

void throw_fragment_id_out_of_bounds_error (const TableDescriptor *table, const int32_t fragment_id, const int32_t max_fragment_id)
 
std::set< const
ColumnDescriptor * > 
get_columns (const ChunkToBufferMap &buffers, const Catalog_Namespace::Catalog &catalog, const int32_t table_id, const int fragment_id)
 
bool skip_metadata_scan (const ColumnDescriptor *column)
 
void throw_unexpected_number_of_items (const size_t num_expected, const size_t num_loaded, const std::string &item_type, const std::string &foreign_table_name)
 
size_t get_buffer_size (const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size)
 
size_t get_buffer_size (const FileRegions &file_regions)
 
size_t get_thread_count (const import_export::CopyParams &copy_params, const bool size_known, const size_t file_size, const size_t buffer_size)
 
size_t get_thread_count (const import_export::CopyParams &copy_params, const FileRegions &file_regions)
 
void resize_delete_buffer (AbstractBuffer *delete_buffer, const size_t chunk_element_count)
 
bool no_deferred_requests (MetadataScanMultiThreadingParams &multi_threading_params)
 
bool is_file_scan_finished (const FileReader *file_reader, MetadataScanMultiThreadingParams &multi_threading_params)
 
foreign_storage::ForeignStorageCacheget_cache_if_enabled (std::shared_ptr< Catalog_Namespace::Catalog > &catalog, const bool disable_cache)
 
void add_placeholder_metadata (const ColumnDescriptor *column, const ForeignTable *foreign_table, const int db_id, const size_t start_row, const size_t total_num_rows, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map)
 
void initialize_non_append_mode_scan (const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< int, FileRegions > &fragment_id_to_file_regions_map, const foreign_storage::OptionsMap &server_options, std::unique_ptr< FileReader > &file_reader, const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &file_path_options, const std::optional< size_t > &max_file_count, const foreign_storage::ForeignTable *foreign_table, const foreign_storage::UserMapping *user_mapping, const foreign_storage::TextFileBufferParser &parser, std::function< std::string()> get_s3_key, size_t &num_rows, size_t &append_start_offset)
 

Function Documentation

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::add_placeholder_metadata ( const ColumnDescriptor column,
const ForeignTable *  foreign_table,
const int  db_id,
const size_t  start_row,
const size_t  total_num_rows,
std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &  chunk_metadata_map 
)

Definition at line 1168 of file AbstractTextFileDataWrapper.cpp.

References CHUNK_KEY_FRAGMENT_IDX, ColumnDescriptor::columnId, ColumnDescriptor::columnType, foreign_storage::get_placeholder_metadata(), SQLTypeInfo::is_varlen_indeed(), TableDescriptor::maxFragRows, and TableDescriptor::tableId.

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata().

1174  {
1175  ChunkKey chunk_key = {db_id, foreign_table->tableId, column->columnId, 0};
1176  if (column->columnType.is_varlen_indeed()) {
1177  chunk_key.emplace_back(1);
1178  }
1179 
1180  // Create placeholder metadata for every fragment touched by this scan
1181  int start_fragment = start_row / foreign_table->maxFragRows;
1182  int end_fragment{0};
1183  if (total_num_rows > 0) {
1184  end_fragment = (total_num_rows - 1) / foreign_table->maxFragRows;
1185  }
1186  for (int fragment_id = start_fragment; fragment_id <= end_fragment; fragment_id++) {
1187  size_t num_elements = (static_cast<size_t>(foreign_table->maxFragRows *
1188  (fragment_id + 1)) > total_num_rows)
1189  ? total_num_rows % foreign_table->maxFragRows
1190  : foreign_table->maxFragRows;
1191 
1192  chunk_key[CHUNK_KEY_FRAGMENT_IDX] = fragment_id;
1193  chunk_metadata_map[chunk_key] =
1194  get_placeholder_metadata(column->columnType, num_elements);
1195  }
1196 }
std::vector< int > ChunkKey
Definition: types.h:36
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
Definition: CsvShared.cpp:254
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:646

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size ( const import_export::CopyParams copy_params,
const bool  size_known,
const size_t  file_size 
)

Gets the appropriate buffer size to be used when processing file(s).

Definition at line 262 of file AbstractTextFileDataWrapper.cpp.

References import_export::CopyParams::buffer_size.

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

264  {
265  size_t buffer_size = copy_params.buffer_size;
266  if (size_known && file_size < buffer_size) {
267  buffer_size = file_size + 1; // +1 for end of line character, if missing
268  }
269  return buffer_size;
270 }
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_buffer_size ( const FileRegions &  file_regions)

Definition at line 272 of file AbstractTextFileDataWrapper.cpp.

References CHECK.

272  {
273  size_t buffer_size = 0;
274  for (const auto& file_region : file_regions) {
275  buffer_size = std::max(buffer_size, file_region.region_size);
276  }
277  CHECK(buffer_size);
278  return buffer_size;
279 }
#define CHECK(condition)
Definition: Logger.h:222
foreign_storage::ForeignStorageCache* foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_cache_if_enabled ( std::shared_ptr< Catalog_Namespace::Catalog > &  catalog,
const bool  disable_cache 
)

Definition at line 580 of file AbstractTextFileDataWrapper.cpp.

Referenced by foreign_storage::cache_blocks().

582  {
583  if (!disable_cache && catalog->getDataMgr()
584  .getPersistentStorageMgr()
585  ->getDiskCacheConfig()
586  .isEnabledForFSI()) {
587  return catalog->getDataMgr().getPersistentStorageMgr()->getDiskCache();
588  } else {
589  return nullptr;
590  }
591 }

+ Here is the caller graph for this function:

std::set<const ColumnDescriptor*> foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_columns ( const ChunkToBufferMap &  buffers,
const Catalog_Namespace::Catalog catalog,
const int32_t  table_id,
const int  fragment_id 
)

Definition at line 81 of file AbstractTextFileDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, and Catalog_Namespace::Catalog::getMetadataForColumn().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkBuffers().

84  {
85  CHECK(!buffers.empty());
86  std::set<const ColumnDescriptor*> columns;
87  for (const auto& entry : buffers) {
88  CHECK_EQ(fragment_id, entry.first[CHUNK_KEY_FRAGMENT_IDX]);
89  const auto column_id = entry.first[CHUNK_KEY_COLUMN_IDX];
90  const auto column = catalog.getMetadataForColumn(table_id, column_id);
91  columns.emplace(column);
92  }
93  return columns;
94 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:222
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count ( const import_export::CopyParams copy_params,
const bool  size_known,
const size_t  file_size,
const size_t  buffer_size 
)

Gets the appropriate number of threads to be used for concurrent processing within the data wrapper.

Definition at line 285 of file AbstractTextFileDataWrapper.cpp.

References CHECK_GT, and import_export::CopyParams::threads.

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

288  {
289  size_t thread_count = copy_params.threads;
290  if (thread_count == 0) {
291  thread_count = std::thread::hardware_concurrency();
292  }
293  if (size_known && file_size > 0) {
294  size_t num_buffers_in_file = (file_size + buffer_size - 1) / buffer_size;
295  if (num_buffers_in_file < thread_count) {
296  thread_count = num_buffers_in_file;
297  }
298  }
299  CHECK_GT(thread_count, static_cast<size_t>(0));
300  return thread_count;
301 }
#define CHECK_GT(x, y)
Definition: Logger.h:234
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33

+ Here is the caller graph for this function:

size_t foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::get_thread_count ( const import_export::CopyParams copy_params,
const FileRegions &  file_regions 
)

Definition at line 303 of file AbstractTextFileDataWrapper.cpp.

References CHECK_GT, and import_export::CopyParams::threads.

304  {
305  size_t thread_count = copy_params.threads;
306  if (thread_count == 0) {
307  thread_count =
308  std::min<size_t>(std::thread::hardware_concurrency(), file_regions.size());
309  }
310  CHECK_GT(thread_count, static_cast<size_t>(0));
311  return thread_count;
312 }
#define CHECK_GT(x, y)
Definition: Logger.h:234
void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan ( const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &  chunk_metadata_map,
const std::map< int, FileRegions > &  fragment_id_to_file_regions_map,
const foreign_storage::OptionsMap server_options,
std::unique_ptr< FileReader > &  file_reader,
const std::string &  file_path,
const import_export::CopyParams copy_params,
const shared::FilePathOptions file_path_options,
const std::optional< size_t > &  max_file_count,
const foreign_storage::ForeignTable foreign_table,
const foreign_storage::UserMapping user_mapping,
const foreign_storage::TextFileBufferParser parser,
std::function< std::string()>  get_s3_key,
size_t &  num_rows,
size_t &  append_start_offset 
)

Definition at line 1198 of file AbstractTextFileDataWrapper.cpp.

References CHECK, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, UNREACHABLE, and foreign_storage::TextFileBufferParser::validateFiles().

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), and foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata().

1212  {
1213  // Should only be called once for non-append tables
1214  CHECK(chunk_metadata_map.empty());
1215  CHECK(fragment_id_to_file_regions_map.empty());
1217  ->second ==
1219  file_reader = std::make_unique<LocalMultiFileReader>(
1220  file_path, copy_params, file_path_options, max_file_count);
1221  } else {
1222  UNREACHABLE();
1223  }
1224  parser.validateFiles(file_reader.get(), foreign_table);
1225  num_rows = 0;
1226  append_start_offset = 0;
1227 }
virtual void validateFiles(const FileReader *file_reader, const ForeignTable *foreign_table) const =0
#define UNREACHABLE()
Definition: Logger.h:266
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::is_file_scan_finished ( const FileReader *  file_reader,
MetadataScanMultiThreadingParams &  multi_threading_params 
)

Definition at line 329 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::FileReader::isScanFinished(), and no_deferred_requests().

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

330  {
331  return file_reader->isScanFinished() && no_deferred_requests(multi_threading_params);
332 }
bool no_deferred_requests(MetadataScanMultiThreadingParams &multi_threading_params)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::no_deferred_requests ( MetadataScanMultiThreadingParams &  multi_threading_params)

Definition at line 323 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::MetadataScanMultiThreadingParams::deferred_requests, and foreign_storage::MetadataScanMultiThreadingParams::deferred_requests_mutex.

Referenced by foreign_storage::dispatch_scan_requests(), and is_file_scan_finished().

323  {
324  std::unique_lock<std::mutex> deferred_requests_lock(
325  multi_threading_params.deferred_requests_mutex);
326  return multi_threading_params.deferred_requests.empty();
327 }

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::resize_delete_buffer ( AbstractBuffer delete_buffer,
const size_t  chunk_element_count 
)

Definition at line 314 of file AbstractTextFileDataWrapper.cpp.

References Data_Namespace::AbstractBuffer::append(), and Data_Namespace::AbstractBuffer::size().

Referenced by foreign_storage::append_data_block_to_chunk(), and foreign_storage::AbstractTextFileDataWrapper::populateChunks().

315  {
316  if (delete_buffer->size() < chunk_element_count) {
317  auto remaining_rows = chunk_element_count - delete_buffer->size();
318  std::vector<int8_t> data(remaining_rows, false);
319  delete_buffer->append(data.data(), remaining_rows);
320  }
321 }
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::skip_metadata_scan ( const ColumnDescriptor column)

Definition at line 96 of file AbstractTextFileDataWrapper.cpp.

References ColumnDescriptor::columnType, and SQLTypeInfo::is_dict_encoded_type().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), and foreign_storage::AbstractTextFileDataWrapper::updateMetadata().

96  {
97  return column->columnType.is_dict_encoded_type();
98 }
bool is_dict_encoded_type() const
Definition: sqltypes.h:664
SQLTypeInfo columnType

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_fragment_id_out_of_bounds_error ( const TableDescriptor table,
const int32_t  fragment_id,
const int32_t  max_fragment_id 
)

Definition at line 71 of file AbstractTextFileDataWrapper.cpp.

References TableDescriptor::tableName, and to_string().

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunks().

73  {
74  throw RequestedFragmentIdOutOfBoundsException{
75  "Attempting to populate fragment id " + std::to_string(fragment_id) +
76  " for foreign table " + table->tableName +
77  " which is greater than the maximum fragment id of " +
78  std::to_string(max_fragment_id) + "."};
79 }
std::string tableName
std::string to_string(char const *&&v)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::throw_unexpected_number_of_items ( const size_t  num_expected,
const size_t  num_loaded,
const std::string &  item_type,
const std::string &  foreign_table_name 
)

Definition at line 194 of file AbstractTextFileDataWrapper.cpp.

References foreign_storage::throw_unexpected_number_of_items().

197  {
198  try {
200  num_expected, num_loaded, item_type);
201  } catch (const foreign_storage::ForeignStorageException& except) {
203  std::string(except.what()) + " Foreign table: " + foreign_table_name);
204  }
205 }
void throw_unexpected_number_of_items(const size_t &num_expected, const size_t &num_loaded, const std::string &item_type)

+ Here is the call graph for this function: