OmniSciDB  4201147b46
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
anonymous_namespace{ForeignDataImporter.cpp} Namespace Reference

Functions

std::string get_data_wrapper_type (const import_export::CopyParams &copy_params)
 
ChunkMetadataVector metadata_scan (foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
 
void validate_copy_params (const import_export::CopyParams &copy_params)
 
import_export::ImportStatus import_foreign_data (const ChunkMetadataVector &metadata_vector, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source)
 

Function Documentation

std::string anonymous_namespace{ForeignDataImporter.cpp}::get_data_wrapper_type ( const import_export::CopyParams copy_params)

Definition at line 44 of file ForeignDataImporter.cpp.

References foreign_storage::DataWrapperType::CSV, import_export::kDelimitedFile, import_export::kParquetFile, import_export::kRegexParsedFile, foreign_storage::DataWrapperType::PARQUET, foreign_storage::DataWrapperType::REGEX_PARSER, import_export::CopyParams::source_type, and UNREACHABLE.

Referenced by import_export::ForeignDataImporter::importGeneral().

44  {
45  std::string data_wrapper_type;
47  data_wrapper_type = foreign_storage::DataWrapperType::CSV;
48  } else if (copy_params.source_type == import_export::SourceType::kRegexParsedFile) {
50 #ifdef ENABLE_IMPORT_PARQUET
51  } else if (copy_params.source_type == import_export::SourceType::kParquetFile) {
53 #endif
54  } else {
55  UNREACHABLE();
56  }
57  return data_wrapper_type;
58 }
static constexpr char const * REGEX_PARSER
#define UNREACHABLE()
Definition: Logger.h:266
import_export::SourceType source_type
Definition: CopyParams.h:57
static constexpr char const * CSV
static constexpr char const * PARQUET

+ Here is the caller graph for this function:

import_export::ImportStatus anonymous_namespace{ForeignDataImporter.cpp}::import_foreign_data ( const ChunkMetadataVector metadata_vector,
Fragmenter_Namespace::InsertDataLoader::InsertConnector connector,
Catalog_Namespace::Catalog catalog,
const TableDescriptor table,
foreign_storage::ForeignDataWrapper data_wrapper,
const Catalog_Namespace::SessionInfo session_info,
const import_export::CopyParams copy_params,
const std::string &  copy_from_source 
)

Definition at line 90 of file ForeignDataImporter.cpp.

References CHECK, CHECK_GE, CHECK_LE, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, CHUNK_KEY_TABLE_IDX, CHUNK_KEY_VARLEN_IDX, foreign_storage::ForeignDataWrapper::createRenderGroupAnalyzers(), g_enable_assign_render_groups, import_export::CopyParams::geo_assign_render_groups, shared::get_from_map(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getDatabaseId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Fragmenter_Namespace::InsertDataLoader::insertChunks(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, import_export::CopyParams::max_reject, foreign_storage::ForeignDataWrapper::populateChunkBuffers(), import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, import_export::Importer::set_import_status(), and TableDescriptor::tableId.

Referenced by import_export::ForeignDataImporter::importGeneral().

98  {
99  int32_t max_fragment_id = -1;
100  for (const auto& [key, _] : metadata_vector) {
101  max_fragment_id = std::max(max_fragment_id, key[CHUNK_KEY_FRAGMENT_IDX]);
102  }
103  CHECK_GE(max_fragment_id, 0);
104 
106  // if render group assignment is enabled, tell the wrapper to create any
107  // RenderGroupAnalyzers it may need for any poly columns in the table, if
108  // that wrapper type supports it
109  data_wrapper->createRenderGroupAnalyzers();
110  }
111 
112  import_export::ImportStatus import_status;
113  Fragmenter_Namespace::InsertDataLoader insert_data_loader(*connector);
114 
115  for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
116  // gather applicable keys to load for fragment
117  std::set<ChunkKey> fragment_keys;
118  for (const auto& [key, _] : metadata_vector) {
119  if (key[CHUNK_KEY_FRAGMENT_IDX] == fragment_id) {
120  fragment_keys.insert(key);
121 
122  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
123  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
124  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
125  if (col_desc->columnType.is_varlen_indeed()) {
126  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX);
127  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data chunk
128  auto index_key = key;
129  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
130  fragment_keys.insert(index_key);
131  }
132  }
133  }
134  }
135 
136  // create buffers
137  std::map<ChunkKey, std::unique_ptr<foreign_storage::ForeignStorageBuffer>>
138  fragment_buffers_owner;
139  foreign_storage::ChunkToBufferMap fragment_buffers;
140  auto delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
141  for (const auto& key : fragment_keys) {
142  fragment_buffers_owner[key] =
143  std::make_unique<foreign_storage::ForeignStorageBuffer>();
144  fragment_buffers_owner[key]->resetToEmpty();
145  fragment_buffers[key] = shared::get_from_map(fragment_buffers_owner, key).get();
146  }
147 
148  // get chunks for import
150  table->tableId, catalog.getDatabaseId(), {}, {}};
151 
152  // get the buffers
153  data_wrapper->populateChunkBuffers(fragment_buffers, {}, delete_buffer.get());
154 
155  // create chunks from buffers
156  for (const auto& [key, buffer] : fragment_buffers) {
157  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
158  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
159  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
160 
161  if (col_desc->columnType.is_varlen_indeed()) {
162  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX); // check for varlen key
163  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data key
164  auto index_key = key;
165  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
166  insert_chunks.chunks[col_id] = Chunk_NS::Chunk::getChunk(
167  col_desc, buffer, shared::get_from_map(fragment_buffers, index_key), false);
168  }
169  } else { // regular non-varlen case with no index buffer
170  insert_chunks.chunks[col_id] =
171  Chunk_NS::Chunk::getChunk(col_desc, buffer, nullptr, false);
172  }
173  }
174 
175  // mark which row indices are valid for import
176  auto row_count = fragment_buffers.begin()
177  ->second->getEncoder()
178  ->getNumElems(); // asssume all chunks have same number of rows,
179  // this is validated at a lower level
180  insert_chunks.valid_row_indices.reserve(row_count);
181  for (size_t irow = 0; irow < row_count; ++irow) {
182  if (delete_buffer->size() > 0) {
183  CHECK_LE(irow, delete_buffer->size());
184  if (delete_buffer->getMemoryPtr()[irow]) {
185  continue;
186  }
187  }
188  insert_chunks.valid_row_indices.emplace_back(irow);
189  }
190 
191  // import chunks
192  insert_data_loader.insertChunks(*session_info, insert_chunks);
193 
194  CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
195  import_status.rows_rejected += row_count - insert_chunks.valid_row_indices.size();
196  import_status.rows_completed += insert_chunks.valid_row_indices.size();
197  if (import_status.rows_rejected > copy_params.max_reject) {
198  import_status.load_failed = true;
199  import_status.load_msg = "Load was cancelled due to max reject rows being reached";
200  import_export::Importer::set_import_status(copy_from_source, import_status);
201  break;
202  }
203  import_export::Importer::set_import_status(copy_from_source, import_status);
204  }
205  return import_status;
206 }
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define CHECK_GE(x, y)
Definition: Logger.h:235
bool g_enable_assign_render_groups
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
int getDatabaseId() const
Definition: Catalog.h:283
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:241
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
#define CHECK_LE(x, y)
Definition: Logger.h:233
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
#define CHECK(condition)
Definition: Logger.h:222
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkMetadataVector anonymous_namespace{ForeignDataImporter.cpp}::metadata_scan ( foreign_storage::ForeignDataWrapper data_wrapper,
foreign_storage::ForeignTable foreign_table 
)

Definition at line 60 of file ForeignDataImporter.cpp.

References TableDescriptor::maxFragRows, foreign_storage::MetadataScanInfeasibleFragmentSizeException::min_feasible_fragment_size_, and foreign_storage::ForeignDataWrapper::populateChunkMetadata().

Referenced by import_export::ForeignDataImporter::importGeneral().

61  {
62  ChunkMetadataVector metadata_vector;
63  try {
64  data_wrapper->populateChunkMetadata(
65  metadata_vector); // explicitly invoke a metadata scan on data wrapper
67  metadata_scan_exception) {
68  // if a metadata scan exception is thrown, check to see if we can adjust
69  // the fragment size and retry
70 
71  auto min_feasible_fragment_size = metadata_scan_exception.min_feasible_fragment_size_;
72  if (min_feasible_fragment_size < 0) {
73  throw; // no valid fragment size returned by exception
74  }
75  foreign_table->maxFragRows = min_feasible_fragment_size;
76  data_wrapper->populateChunkMetadata(
77  metadata_vector); // attempt another metadata scan, note, we assume that the
78  // metadata scan can be reentered safely after throwing the
79  // exception
80  }
81  return metadata_vector;
82 }
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void anonymous_namespace{ForeignDataImporter.cpp}::validate_copy_params ( const import_export::CopyParams copy_params)

Definition at line 84 of file ForeignDataImporter.cpp.

References import_export::kRegexParsedFile, import_export::CopyParams::source_type, and foreign_storage::validate_regex_parser_options().

Referenced by import_export::ForeignDataImporter::importGeneral().

84  {
87  }
88 }
import_export::SourceType source_type
Definition: CopyParams.h:57
void validate_regex_parser_options(const import_export::CopyParams &copy_params)

+ Here is the call graph for this function:

+ Here is the caller graph for this function: