OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
anonymous_namespace{ForeignDataImporter.cpp} Namespace Reference

Classes

struct  FragmentBuffers
 

Functions

ChunkMetadataVector metadata_scan (foreign_storage::ForeignDataWrapper *data_wrapper, foreign_storage::ForeignTable *foreign_table)
 
std::string get_import_id (const import_export::CopyParams &copy_params, const std::string &copy_from_source)
 
void validate_copy_params (const import_export::CopyParams &copy_params)
 
std::unique_ptr< FragmentBufferscreate_fragment_buffers (const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
 
void load_foreign_data_buffers (Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
 
import_export::ImportStatus import_foreign_data (const int32_t max_fragment_id, Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, foreign_storage::ForeignDataWrapper *data_wrapper, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, const size_t maximum_num_fragments_buffered)
 

Function Documentation

std::unique_ptr<FragmentBuffers> anonymous_namespace{ForeignDataImporter.cpp}::create_fragment_buffers ( const int32_t  fragment_id,
Catalog_Namespace::Catalog catalog,
const TableDescriptor table 
)

Definition at line 95 of file ForeignDataImporter.cpp.

References shared::get_from_map(), Catalog_Namespace::Catalog::getAllColumnMetadataForTable(), Catalog_Namespace::Catalog::getDatabaseId(), and TableDescriptor::tableId.

Referenced by import_foreign_data().

98  {
99  auto columns = catalog.getAllColumnMetadataForTable(table->tableId, false, false, true);
100 
101  std::set<ChunkKey> fragment_keys;
102  for (const auto col_desc : columns) {
103  ChunkKey key{
104  catalog.getDatabaseId(), table->tableId, col_desc->columnId, fragment_id};
105  if (col_desc->columnType.is_varlen_indeed()) {
106  auto data_key = key;
107  data_key.push_back(1);
108  fragment_keys.insert(data_key);
109  auto index_key = key;
110  index_key.push_back(2);
111  fragment_keys.insert(index_key);
112  } else {
113  fragment_keys.insert(key);
114  }
115  }
116 
117  // create buffers
118  std::unique_ptr<FragmentBuffers> frag_buffers = std::make_unique<FragmentBuffers>();
119  frag_buffers->delete_buffer = std::make_unique<foreign_storage::ForeignStorageBuffer>();
120  for (const auto& key : fragment_keys) {
121  frag_buffers->fragment_buffers_owner[key] =
122  std::make_unique<foreign_storage::ForeignStorageBuffer>();
123  frag_buffers->fragment_buffers[key] =
124  shared::get_from_map(frag_buffers->fragment_buffers_owner, key).get();
125  }
126 
127  return frag_buffers;
128 }
std::vector< int > ChunkKey
Definition: types.h:36
int getDatabaseId() const
Definition: Catalog.h:304
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:2267

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string anonymous_namespace{ForeignDataImporter.cpp}::get_import_id ( const import_export::CopyParams copy_params,
const std::string &  copy_from_source 
)

Definition at line 68 of file ForeignDataImporter.cpp.

References import_export::kDelimitedFile, import_export::kParquetFile, import_export::kRegexParsedFile, and import_export::CopyParams::source_type.

Referenced by load_foreign_data_buffers().

69  {
72 #ifdef ENABLE_IMPORT_PARQUET
74 #endif
75  ) {
76  return boost::filesystem::path(copy_from_source).filename().string();
77  }
78 
79  return copy_from_source;
80 }
import_export::SourceType source_type
Definition: CopyParams.h:57

+ Here is the caller graph for this function:

import_export::ImportStatus anonymous_namespace{ForeignDataImporter.cpp}::import_foreign_data ( const int32_t  max_fragment_id,
Fragmenter_Namespace::InsertDataLoader::InsertConnector connector,
Catalog_Namespace::Catalog catalog,
const TableDescriptor table,
foreign_storage::ForeignDataWrapper data_wrapper,
const Catalog_Namespace::SessionInfo session_info,
const import_export::CopyParams copy_params,
const std::string &  copy_from_source,
const size_t  maximum_num_fragments_buffered 
)

Definition at line 243 of file ForeignDataImporter.cpp.

References threading_serial::async(), create_fragment_buffers(), foreign_storage::ForeignDataWrapper::createRenderGroupAnalyzers(), g_enable_assign_render_groups, import_export::CopyParams::geo_assign_render_groups, load_foreign_data_buffers(), and foreign_storage::ForeignDataWrapper::populateChunkBuffers().

Referenced by import_export::ForeignDataImporter::importGeneral().

252  {
254  // if render group assignment is enabled, tell the wrapper to create any
255  // RenderGroupAnalyzers it may need for any poly columns in the table, if
256  // that wrapper type supports it
257  data_wrapper->createRenderGroupAnalyzers();
258  }
259 
260  import_export::ImportStatus import_status;
261 
262  std::mutex communication_mutex;
263  bool continue_loading =
264  true; // when false, signals that the last buffer to load has been added, and
265  // loading should cease after loading remaining buffers
266  bool load_failed =
267  false; // signals loading has failed and buffer population should cease
268  bool data_wrapper_error_occured = false; // signals an error occured during buffer
269  // population and loading should cease
270  std::condition_variable buffers_to_load_condition;
271  std::list<std::unique_ptr<FragmentBuffers>> buffers_to_load;
272 
273  // launch separate thread to load processed fragment buffers
274  auto load_future = std::async(std::launch::async,
276  connector,
277  std::ref(catalog),
278  table,
279  session_info,
280  std::cref(copy_params),
281  std::cref(copy_from_source),
282  std::ref(import_status),
283  std::ref(communication_mutex),
284  std::ref(continue_loading),
285  std::ref(load_failed),
286  std::ref(data_wrapper_error_occured),
287  std::ref(buffers_to_load_condition),
288  std::ref(buffers_to_load));
289 
290  for (int32_t fragment_id = 0; fragment_id <= max_fragment_id; ++fragment_id) {
291  {
292  std::unique_lock communication_lock(communication_mutex);
293  buffers_to_load_condition.wait(communication_lock, [&]() {
294  return buffers_to_load.size() < maximum_num_fragments_buffered || load_failed;
295  });
296  if (load_failed) {
297  break;
298  }
299  }
300  auto grouped_fragment_buffers = create_fragment_buffers(fragment_id, catalog, table);
301  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
302  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
303 
304  // get the buffers, accounting for the possibility of the requested fragment id being
305  // out of bounds
306  try {
307  data_wrapper->populateChunkBuffers(fragment_buffers, {}, delete_buffer.get());
309  break;
310  } catch (...) {
311  std::unique_lock communication_lock(communication_mutex);
312  data_wrapper_error_occured = true;
313  buffers_to_load_condition.notify_all();
314  throw;
315  }
316 
317  std::unique_lock communication_lock(communication_mutex);
318  buffers_to_load.emplace_back(std::move(grouped_fragment_buffers));
319  buffers_to_load_condition.notify_all();
320  }
321 
322  { // data wrapper processing has finished, notify loading thread
323  std::unique_lock communication_lock(communication_mutex);
324  continue_loading = false;
325  buffers_to_load_condition.notify_all();
326  }
327 
328  // any exceptions in separate loading thread will occur here
329  load_future.get();
330 
331  return import_status;
332 }
std::unique_ptr< FragmentBuffers > create_fragment_buffers(const int32_t fragment_id, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table)
future< Result > async(Fn &&fn, Args &&...args)
bool g_enable_assign_render_groups
std::unique_lock< T > unique_lock
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
void load_foreign_data_buffers(Fragmenter_Namespace::InsertDataLoader::InsertConnector *connector, Catalog_Namespace::Catalog &catalog, const TableDescriptor *table, const Catalog_Namespace::SessionInfo *session_info, const import_export::CopyParams &copy_params, const std::string &copy_from_source, import_export::ImportStatus &import_status, std::mutex &communication_mutex, bool &continue_loading, bool &load_failed, bool &data_wrapper_error_occured, std::condition_variable &buffers_to_load_condition, std::list< std::unique_ptr< FragmentBuffers >> &buffers_to_load)
virtual void createRenderGroupAnalyzers()
Create RenderGroupAnalyzers for poly columns.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void anonymous_namespace{ForeignDataImporter.cpp}::load_foreign_data_buffers ( Fragmenter_Namespace::InsertDataLoader::InsertConnector connector,
Catalog_Namespace::Catalog catalog,
const TableDescriptor table,
const Catalog_Namespace::SessionInfo session_info,
const import_export::CopyParams copy_params,
const std::string &  copy_from_source,
import_export::ImportStatus import_status,
std::mutex &  communication_mutex,
bool &  continue_loading,
bool &  load_failed,
bool &  data_wrapper_error_occured,
std::condition_variable &  buffers_to_load_condition,
std::list< std::unique_ptr< FragmentBuffers >> &  buffers_to_load 
)

Definition at line 130 of file ForeignDataImporter.cpp.

References CHECK, CHECK_LE, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_TABLE_IDX, CHUNK_KEY_VARLEN_IDX, shared::get_from_map(), get_import_id(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getDatabaseId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Fragmenter_Namespace::InsertDataLoader::insertChunks(), import_export::ImportStatus::load_failed, import_export::ImportStatus::load_msg, import_export::CopyParams::max_reject, import_export::ImportStatus::rows_completed, import_export::ImportStatus::rows_rejected, import_export::Importer::set_import_status(), and TableDescriptor::tableId.

Referenced by import_foreign_data().

143  {
144  Fragmenter_Namespace::InsertDataLoader insert_data_loader(*connector);
145  while (true) {
146  {
147  std::unique_lock communication_lock(communication_mutex);
148  buffers_to_load_condition.wait(communication_lock, [&]() {
149  return !buffers_to_load.empty() || !continue_loading ||
150  data_wrapper_error_occured;
151  });
152  if ((buffers_to_load.empty() && !continue_loading) || data_wrapper_error_occured) {
153  return;
154  }
155  }
156 
157  CHECK(!buffers_to_load.empty());
158 
159  try {
160  std::unique_ptr<FragmentBuffers> grouped_fragment_buffers;
161  {
162  std::unique_lock communication_lock(communication_mutex);
163  grouped_fragment_buffers.reset(buffers_to_load.front().release());
164  buffers_to_load.pop_front();
165  buffers_to_load_condition.notify_all();
166  }
167  auto& fragment_buffers = grouped_fragment_buffers->fragment_buffers;
168  auto& delete_buffer = grouped_fragment_buffers->delete_buffer;
169 
170  // get chunks for import
172  table->tableId, catalog.getDatabaseId(), {}, {}};
173 
174  // create chunks from buffers
175  for (const auto& [key, buffer] : fragment_buffers) {
176  const auto col_id = key[CHUNK_KEY_COLUMN_IDX];
177  const auto table_id = key[CHUNK_KEY_TABLE_IDX];
178  const auto col_desc = catalog.getMetadataForColumn(table_id, col_id);
179 
180  if (col_desc->columnType.is_varlen_indeed()) {
181  CHECK(key.size() > CHUNK_KEY_VARLEN_IDX); // check for varlen key
182  if (key[CHUNK_KEY_VARLEN_IDX] == 1) { // data key
183  auto index_key = key;
184  index_key[CHUNK_KEY_VARLEN_IDX] = 2;
185  insert_chunks.chunks[col_id] = Chunk_NS::Chunk::getChunk(
186  col_desc,
187  buffer,
188  shared::get_from_map(fragment_buffers, index_key),
189  false);
190  }
191  } else { // regular non-varlen case with no index buffer
192  insert_chunks.chunks[col_id] =
193  Chunk_NS::Chunk::getChunk(col_desc, buffer, nullptr, false);
194  }
195  }
196 
197  // mark which row indices are valid for import
198  auto row_count = fragment_buffers.begin()
199  ->second->getEncoder()
200  ->getNumElems(); // assume all chunks have same number of
201  // rows, this is validated at a lower level
202  insert_chunks.valid_row_indices.reserve(row_count);
203  for (size_t irow = 0; irow < row_count; ++irow) {
204  if (delete_buffer->size() > 0) {
205  CHECK_LE(irow, delete_buffer->size());
206  if (delete_buffer->getMemoryPtr()[irow]) {
207  continue;
208  }
209  }
210  insert_chunks.valid_row_indices.emplace_back(irow);
211  }
212 
213  // import chunks
214  insert_data_loader.insertChunks(*session_info, insert_chunks);
215 
216  CHECK_LE(insert_chunks.valid_row_indices.size(), row_count);
217  import_status.rows_rejected += row_count - insert_chunks.valid_row_indices.size();
218  import_status.rows_completed += insert_chunks.valid_row_indices.size();
219  if (import_status.rows_rejected > copy_params.max_reject) {
220  import_status.load_failed = true;
221  import_status.load_msg =
222  "Load was cancelled due to max reject rows being reached";
224  get_import_id(copy_params, copy_from_source), import_status);
225  std::unique_lock communication_lock(communication_mutex);
226  load_failed = true;
227  buffers_to_load_condition.notify_all();
228  return;
229  }
231  get_import_id(copy_params, copy_from_source), import_status);
232  } catch (...) {
233  {
234  std::unique_lock communication_lock(communication_mutex);
235  load_failed = true;
236  buffers_to_load_condition.notify_all();
237  }
238  throw;
239  }
240  }
241 }
std::string get_import_id(const import_export::CopyParams &copy_params, const std::string &copy_from_source)
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
int getDatabaseId() const
Definition: Catalog.h:304
static void set_import_status(const std::string &id, const ImportStatus is)
Definition: Importer.cpp:239
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK_LE(x, y)
Definition: Logger.h:304
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
#define CHECK(condition)
Definition: Logger.h:291
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkMetadataVector anonymous_namespace{ForeignDataImporter.cpp}::metadata_scan ( foreign_storage::ForeignDataWrapper data_wrapper,
foreign_storage::ForeignTable foreign_table 
)

Definition at line 44 of file ForeignDataImporter.cpp.

References TableDescriptor::maxFragRows, foreign_storage::MetadataScanInfeasibleFragmentSizeException::min_feasible_fragment_size_, and foreign_storage::ForeignDataWrapper::populateChunkMetadata().

Referenced by import_export::ForeignDataImporter::importGeneral().

45  {
46  ChunkMetadataVector metadata_vector;
47  try {
48  data_wrapper->populateChunkMetadata(
49  metadata_vector); // explicitly invoke a metadata scan on data wrapper
51  metadata_scan_exception) {
52  // if a metadata scan exception is thrown, check to see if we can adjust
53  // the fragment size and retry
54 
55  auto min_feasible_fragment_size = metadata_scan_exception.min_feasible_fragment_size_;
56  if (min_feasible_fragment_size < 0) {
57  throw; // no valid fragment size returned by exception
58  }
59  foreign_table->maxFragRows = min_feasible_fragment_size;
60  data_wrapper->populateChunkMetadata(
61  metadata_vector); // attempt another metadata scan, note, we assume that the
62  // metadata scan can be reentered safely after throwing the
63  // exception
64  }
65  return metadata_vector;
66 }
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
virtual void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector)=0

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void anonymous_namespace{ForeignDataImporter.cpp}::validate_copy_params ( const import_export::CopyParams copy_params)

Definition at line 82 of file ForeignDataImporter.cpp.

References import_export::kRegexParsedFile, import_export::CopyParams::source_type, and foreign_storage::validate_regex_parser_options().

Referenced by import_export::ForeignDataImporter::importGeneral().

82  {
85  }
86 }
import_export::SourceType source_type
Definition: CopyParams.h:57
void validate_regex_parser_options(const import_export::CopyParams &copy_params)

+ Here is the call graph for this function:

+ Here is the caller graph for this function: