OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
anonymous_namespace{ForeignDataImporter.cpp} Namespace Reference

Classes

struct  FragmentBuffers
 

Functions

ChunkMetadataVector metadata_scan (foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
 
std::string get_import_id (const import_export::CopyParams &copy_params, const std::string &copy_from_source)
 
void validate_copy_params (const import_export::CopyParams &copy_params)
 
std::unique_ptr< FragmentBufferscreate_fragment_buffers (const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
 
void load_foreign_data_buffers (Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
 
import_export::ImportStatus import_foreign_data (const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, const size_t maximum_num_fragments_buffered)
 

Function Documentation

std::unique_ptr<FragmentBuffers> anonymous_namespace{ForeignDataImporter.cpp}::create_fragment_buffers ( const int32_t  fragment_id,
Catalog_Namespace::Catalog catalog,
const TableDescriptor table 
)

Definition at line 94 of file ForeignDataImporter.cpp.

References shared::get_from_map(), Catalog_Namespace::Catalog::getAllColumnMetadataForTable(), Catalog_Namespace::Catalog::getDatabaseId(), and TableDescriptor::tableId.

Referenced by import_foreign_data().

97  {
98  auto columns = catalog.getAllColumnMetadataForTable(table->tableId, false, false, true);
99 
100  std::set<ChunkKey> fragment_keys;
101  for (const auto col_desc : columns) {
102  ChunkKey key{
103  catalog.getDatabaseId(), table->tableId, col_desc->columnId, fragment_id};
104  if (col_desc->columnType.is_varlen_indeed()) {
105  auto data_key = key;
106  data_key.push_back(1);
107  fragment_keys.insert(data_key);
108  auto index_key = key;
109  index_key.push_back(2);
110  fragment_keys.insert(index_key);
111  } else {
112  fragment_keys.insert(key);
113  }
114  }
115 
116  // create buffers
117  std::unique_ptr<FragmentBuffers> frag_buffers = std::make_unique<FragmentBuffers>();
118  frag_buffers->delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
119  for (const auto& key : fragment_keys) {
120  frag_buffers->fragment_buffers_owner[key] =
121  std::make_unique<foreign_storage::ForeignStorageBuffer>();
122  frag_buffers->fragment_buffers[key] =
123  shared::get_from_map(frag_buffers->fragment_buffers_owner, key).get();
124  }
125 
126  return frag_buffers;
127 }
std::vector< int > ChunkKey
Definition: types.h:36
int getDatabaseId() const
Definition: Catalog.h:304
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2254

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string anonymous_namespace{ForeignDataImporter.cpp}::get_import_id ( const import_export::CopyParams copy_params,
const std::string &  copy_from_source 
)

Definition at line 67 of file ForeignDataImporter.cpp.

References import_export::kDelimitedFile, import_export::kParquetFile, import_export::kRegexParsedFile, and import_export::CopyParams::source_type.

Referenced by load_foreign_data_buffers().

68  {
71 #ifdef ENABLE_IMPORT_PARQUET
73 #endif
74  ) {
75  return boost::filesystem::path(copy_from_source).filename().string();
76  }
77 
78  return copy_from_source;
79 }
import_export::SourceType source_type
Definition: CopyParams.h:57

+ Here is the caller graph for this function:

import_export::ImportStatus anonymous_namespace{ForeignDataImporter.cpp}::import_foreign_data ( const int32_t  max_fragment_id,
Fragmenter_Namespace::InsertDataLoader::InsertConnector connector,
Catalog_Namespace::Catalog catalog,
const TableDescriptor table,
foreign_storage::ForeignDataWrapper data_wrapper,
const Catalog_Namespace::SessionInfo session_info,
const import_export::CopyParams copy_params,
const std::string &  copy_from_source,
const size_t  maximum_num_fragments_buffered 
)

Definition at line 242 of file ForeignDataImporter.cpp.

References threading_serial::async(), create_fragment_buffers(), foreign_storage::ForeignDataWrapper::createRenderGroupAnalyzers(), g_enable_assign_render_groups, import_export::CopyParams::geo_assign_render_groups, load_foreign_data_buffers(), and foreign_storage::ForeignDataWrapper::populateChunkBuffers().

Referenced by import_export::ForeignDataImporter::importGeneral().

251  {
253  // if render group assignment is enabled, tell the wrapper to create any
254  // RenderGroupAnalyzers it may need for any poly columns in the table, if
255  // that wrapper type supports it
256  data_wrapper->createRenderGroupAnalyzers();
257  }
258 
259  import_export::ImportStatus import_status;
260 
261  std::mutex communication_mutex;
262  bool continue_loading =
263  true; // when false, signals that the last buffer to load has been added, and
264  // loading should cease after loading remaining buffers
265  bool load_failed =
266  false; // signals loading has failed and buffer population should cease
267  bool data_wrapper_error_occured = false; // signals an error occured during buffer
268  // population and loading should cease
269  std::condition_variable buffers_to_load_condition;
270  std::list<std::unique_ptr<FragmentBuffers>> buffers_to_load;
271 
272  // launch separate thread to load processed fragment buffers
273  auto load_future = std::async(std::launch::async,
275  connector,
276  std::ref(catalog),
277  table,
278  session_info,
279  std::cref(copy_params),
280  std::cref(copy_from_source),
281  std::ref(import_status),
282  std::ref(communication_mutex),
283  std::ref(continue_loading),
284  std::ref(load_failed),
285  std::ref(data_wrapper_error_occured),
286  std::ref(buffers_to_load_condition),
287  std::ref(buffers_to_load));
288 
289  for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
290  {
291  std::unique_lock communication_lock(communication_mutex);
292  buffers_to_load_condition.wait(communication_lock, [&]() {
293  return buffers_to_load.size() < maximum_num_fragments_buffered || load_failed;
294  });
295  if (load_failed) {
296  break;
297  }
298  }
299  auto grouped_fragment_buffers = create_fragment_buffers(fragment_id, catalog, table);
300  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
301  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
302 
303  // get the buffers, accounting for the possibility of the requested fragment id being
304  // out of bounds
305  try {
306  data_wrapper->populateChunkBuffers(fragment_buffers, {}, delete_buffer.get());
308  break;
309  } catch (...) {
310  std::unique_lock communication_lock(communication_mutex);
311  data_wrapper_error_occured = true;
312  buffers_to_load_condition.notify_all();
313  throw;
314  }
315 
316  std::unique_lock communication_lock(communication_mutex);
317  buffers_to_load.emplace_back(std::move(grouped_fragment_buffers));
318  buffers_to_load_condition.notify_all();
319  }
320 
321  { // data wrapper processing has finished, notify loading thread
322  std::unique_lock communication_lock(communication_mutex);
323  continue_loading = false;
324  buffers_to_load_condition.notify_all();
325  }
326 
327  // any exceptions in separate loading thread will occur here
328  load_future.get();
329 
330  return import_status;
331 }
std::unique_ptr< FragmentBuffers > create_fragment_buffers(const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
std::unique_lock< T > unique_lock
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
void load_foreign_data_buffers(Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void anonymous_namespace{ForeignDataImporter.cpp}::load_foreign_data_buffers ( Fragmenter_Namespace::InsertDataLoader::InsertConnector connector,
Catalog_Namespace::Catalog catalog,
const TableDescriptor table,
const Catalog_Namespace::SessionInfo session_info,
const import_export::CopyParams copy_params,
const std::string &  copy_from_source,
import_export::ImportStatus import_status,
std::mutex &  communication_mutex,
bool &  continue_loading,
bool &  load_failed,
bool &  data_wrapper_error_occured,
std::condition_variable &  buffers_to_load_condition,
std::list< std::unique_ptr< FragmentBuffers >> &  buffers_to_load 
)

Definition at line 129 of file ForeignDataImporter.cpp.

References CHECK, CHECK_LE, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_TABLE_IDX, CHUNK_KEY_VARLEN_IDX, shared::get_from_map(), get_import_id(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getDatabaseId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Fragmenter_Namespace::InsertDataLoader::insertChunks(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, import_export::CopyParams::max_reject, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, import_export::Importer::set_import_status(), and TableDescriptor::tableId.

Referenced by import_foreign_data().

142  {
143  Fragmenter_Namespace::InsertDataLoader insert_data_loader(*connector);
144  while (true) {
145  {
146  std::unique_lock communication_lock(communication_mutex);
147  buffers_to_load_condition.wait(communication_lock, [&]() {
148  return !buffers_to_load.empty() || !continue_loading ||
149  data_wrapper_error_occured;
150  });
151  if ((buffers_to_load.empty() && !continue_loading) || data_wrapper_error_occured) {
152  return;
153  }
154  }
155 
156  CHECK(!buffers_to_load.empty());
157 
158  try {
159  std::unique_ptr<FragmentBuffers> grouped_fragment_buffers;
160  {
161  std::unique_lock communication_lock(communication_mutex);
162  grouped_fragment_buffers.reset(buffers_to_load.front().release());
163  buffers_to_load.pop_front();
164  buffers_to_load_condition.notify_all();
165  }
166  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
167  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
168 
169  // get chunks for import
171  table->tableId, catalog.getDatabaseId(), {}, {}};
172 
173  // create chunks from buffers
174  for (const auto& [key, buffer] : fragment_buffers) {
175  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
176  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
177  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
178 
179  if (col_desc->columnType.is_varlen_indeed()) {
180  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX); // check for varlen key
181  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data key
182  auto index_key = key;
183  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
184  insert_chunks.chunks[col_id] = Chunk_NS::Chunk::getChunk(
185  col_desc,
186  buffer,
187  shared::get_from_map(fragment_buffers, index_key),
188  false);
189  }
190  } else { // regular non-varlen case with no index buffer
191  insert_chunks.chunks[col_id] =
192  Chunk_NS::Chunk::getChunk(col_desc, buffer, nullptr, false);
193  }
194  }
195 
196  // mark which row indices are valid for import
197  auto row_count = fragment_buffers.begin()
198  ->second->getEncoder()
199  ->getNumElems(); // assume all chunks have same number of
200  // rows, this is validated at a lower level
201  insert_chunks.valid_row_indices.reserve(row_count);
202  for (size_t irow = 0; irow < row_count; ++irow) {
203  if (delete_buffer->size() > 0) {
204  CHECK_LE(irow, delete_buffer->size());
205  if (delete_buffer->getMemoryPtr()[irow]) {
206  continue;
207  }
208  }
209  insert_chunks.valid_row_indices.emplace_back(irow);
210  }
211 
212  // import chunks
213  insert_data_loader.insertChunks(*session_info, insert_chunks);
214 
215  CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
216  import_status.rows_rejected += row_count - insert_chunks.valid_row_indices.size();
217  import_status.rows_completed += insert_chunks.valid_row_indices.size();
218  if (import_status.rows_rejected > copy_params.max_reject) {
219  import_status.load_failed = true;
220  import_status.load_msg =
221  "Load was cancelled due to max reject rows being reached";
223  get_import_id(copy_params, copy_from_source), import_status);
224  std::unique_lock communication_lock(communication_mutex);
225  load_failed = true;
226  buffers_to_load_condition.notify_all();
227  return;
228  }
230  get_import_id(copy_params, copy_from_source), import_status);
231  } catch (...) {
232  {
233  std::unique_lock communication_lock(communication_mutex);
234  load_failed = true;
235  buffers_to_load_condition.notify_all();
236  }
237  throw;
238  }
239  }
240 }
std::string get_import_id(const import_export::CopyParams &copy_params, const std::string &copy_from_source)
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
int getDatabaseId() const
Definition: Catalog.h:304
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:239
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK_LE(x, y)
Definition: Logger.h:300
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
#define CHECK(condition)
Definition: Logger.h:289
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkMetadataVector anonymous_namespace{ForeignDataImporter.cpp}::metadata_scan ( foreign_storage::ForeignDataWrapper data_wrapper,
foreign_storage::ForeignTable foreign_table 
)

Definition at line 43 of file ForeignDataImporter.cpp.

References TableDescriptor::maxFragRows, foreign_storage::MetadataScanInfeasibleFragmentSizeException::min_feasible_fragment_size_, and foreign_storage::ForeignDataWrapper::populateChunkMetadata().

Referenced by import_export::ForeignDataImporter::importGeneral().

44  {
45  ChunkMetadataVector metadata_vector;
46  try {
47  data_wrapper->populateChunkMetadata(
48  metadata_vector); // explicitly invoke a metadata scan on data wrapper
50  metadata_scan_exception) {
51  // if a metadata scan exception is thrown, check to see if we can adjust
52  // the fragment size and retry
53 
54  auto min_feasible_fragment_size = metadata_scan_exception.min_feasible_fragment_size_;
55  if (min_feasible_fragment_size < 0) {
56  throw; // no valid fragment size returned by exception
57  }
58  foreign_table->maxFragRows = min_feasible_fragment_size;
59  data_wrapper->populateChunkMetadata(
60  metadata_vector); // attempt another metadata scan, note, we assume that the
61  // metadata scan can be reentered safely after throwing the
62  // exception
63  }
64  return metadata_vector;
65 }
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void anonymous_namespace{ForeignDataImporter.cpp}::validate_copy_params ( const import_export::CopyParams copy_params)

Definition at line 81 of file ForeignDataImporter.cpp.

References import_export::kRegexParsedFile, import_export::CopyParams::source_type, and foreign_storage::validate_regex_parser_options().

Referenced by import_export::ForeignDataImporter::importGeneral().

81  {
84  }
85 }
import_export::SourceType source_type
Definition: CopyParams.h:57
void validate_regex_parser_options(const import_export::CopyParams &copy_params)

+ Here is the call graph for this function:

+ Here is the caller graph for this function: