OmniSciDB  29e35f4d58
StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER > Class Template Reference

#include <StorageIOFacility.h>

Classes

struct  DeleteTransactionParameters
 
struct  MethodSelector
 
class  TransactionParameters
 
class  UpdateTransactionParameters
 

Public Types

using ExecutorType = typename EXECUTOR_TRAITS::ExecutorType
 
using CatalogType = typename EXECUTOR_TRAITS::CatalogType
 
using FragmentUpdaterType = FRAGMENT_UPDATER
 
using UpdateCallback = typename FragmentUpdaterType::Callback
 
using IOFacility = IO_FACET
 
using TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType
 
using DeleteVictimOffsetList = typename IOFacility::DeleteVictimOffsetList
 
using UpdateTargetOffsetList = typename IOFacility::UpdateTargetOffsetList
 
using UpdateTargetTypeList = typename IOFacility::UpdateTargetTypeList
 
using UpdateTargetColumnNamesList = typename IOFacility::UpdateTargetColumnNamesList
 
using UpdateTargetColumnNameType = typename UpdateTargetColumnNamesList::value_type
 
using ColumnValidationFunction = typename IOFacility::ColumnValidationFunction
 
using StringSelector = Experimental::MetaTypeClass< Experimental::String >
 
using NonStringSelector = Experimental::UncapturedMetaTypeClass
 

Public Member Functions

 StorageIOFacility (ExecutorType *executor, CatalogType const &catalog)
 
ColumnValidationFunction yieldColumnValidator (TableDescriptorType const *table_descriptor)
 
UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Private Member Functions

int normalized_cpu_threads () const
 

Private Attributes

ExecutorTypeexecutor_
 
CatalogType const & catalog_
 

Detailed Description

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
class StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >

Definition at line 103 of file StorageIOFacility.h.

Member Typedef Documentation

◆ CatalogType

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::CatalogType = typename EXECUTOR_TRAITS::CatalogType

Definition at line 106 of file StorageIOFacility.h.

◆ ColumnValidationFunction

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::ColumnValidationFunction = typename IOFacility::ColumnValidationFunction

Definition at line 116 of file StorageIOFacility.h.

◆ DeleteVictimOffsetList

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::DeleteVictimOffsetList = typename IOFacility::DeleteVictimOffsetList

Definition at line 111 of file StorageIOFacility.h.

◆ ExecutorType

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::ExecutorType = typename EXECUTOR_TRAITS::ExecutorType

Definition at line 105 of file StorageIOFacility.h.

◆ FragmentUpdaterType

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::FragmentUpdaterType = FRAGMENT_UPDATER

Definition at line 107 of file StorageIOFacility.h.

◆ IOFacility

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::IOFacility = IO_FACET

Definition at line 109 of file StorageIOFacility.h.

◆ NonStringSelector

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::NonStringSelector = Experimental::UncapturedMetaTypeClass

Definition at line 119 of file StorageIOFacility.h.

◆ StringSelector

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::StringSelector = Experimental::MetaTypeClass<Experimental::String>

Definition at line 118 of file StorageIOFacility.h.

◆ TableDescriptorType

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType

Definition at line 110 of file StorageIOFacility.h.

◆ UpdateCallback

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateCallback = typename FragmentUpdaterType::Callback

Definition at line 108 of file StorageIOFacility.h.

◆ UpdateTargetColumnNamesList

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTargetColumnNamesList = typename IOFacility::UpdateTargetColumnNamesList

Definition at line 114 of file StorageIOFacility.h.

◆ UpdateTargetColumnNameType

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTargetColumnNameType = typename UpdateTargetColumnNamesList::value_type

Definition at line 115 of file StorageIOFacility.h.

◆ UpdateTargetOffsetList

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTargetOffsetList = typename IOFacility::UpdateTargetOffsetList

Definition at line 112 of file StorageIOFacility.h.

◆ UpdateTargetTypeList

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTargetTypeList = typename IOFacility::UpdateTargetTypeList

Definition at line 113 of file StorageIOFacility.h.

Constructor & Destructor Documentation

◆ StorageIOFacility()

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::StorageIOFacility ( ExecutorType executor,
CatalogType const &  catalog 
)
inline

Definition at line 184 of file StorageIOFacility.h.

185  : executor_(executor), catalog_(catalog) {}
ExecutorType * executor_
CatalogType const & catalog_

Member Function Documentation

◆ normalized_cpu_threads()

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
int StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::normalized_cpu_threads ( ) const
inlineprivate

Definition at line 196 of file StorageIOFacility.h.

196 { return cpu_threads() / 2; }
int cpu_threads()
Definition: thread_count.h:25

◆ yieldColumnValidator()

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
ColumnValidationFunction StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::yieldColumnValidator ( TableDescriptorType const *  table_descriptor)
inline

Definition at line 187 of file StorageIOFacility.h.

188  {
189  return IOFacility::yieldColumnValidator(catalog_, table_descriptor);
190  }
CatalogType const & catalog_

◆ yieldDeleteCallback()

template<typename EXECUTOR_TRAITS , typename IO_FACET , typename FRAGMENT_UPDATER >
StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateCallback StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::yieldDeleteCallback ( DeleteTransactionParameters delete_parameters)

Definition at line 365 of file StorageIOFacility.h.

366  {
367  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
368 
369  auto callback = [this,
370  &delete_parameters](FragmentUpdaterType const& update_log) -> void {
371  auto entries_per_column = update_log.getEntryCount();
372  auto rows_per_column = update_log.getRowCount();
373  if (rows_per_column == 0) {
374  return;
375  }
376  DeleteVictimOffsetList victim_offsets(rows_per_column);
377 
378  auto complete_row_block_size = entries_per_column / normalized_cpu_threads();
379  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
380  auto usable_threads = normalized_cpu_threads();
381 
382  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
383  complete_row_block_size = rows_per_column;
384  partial_row_block_size = 0;
385  usable_threads = 1;
386  }
387 
388  std::atomic<size_t> row_idx{0};
389 
390  auto process_rows = [&update_log, &victim_offsets, &row_idx](
391  uint64_t entry_start, uint64_t entry_count) -> uint64_t {
392  uint64_t entries_processed = 0;
393 
394  for (uint64_t entry_index = entry_start; entry_index < (entry_start + entry_count);
395  entry_index++) {
396  auto const row(update_log.getEntryAt(entry_index));
397 
398  if (row.empty()) {
399  continue;
400  }
401 
402  entries_processed++;
403  size_t row_index = row_idx.fetch_add(1);
404 
405  auto terminal_column_iter = std::prev(row.end());
406  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
407  CHECK(scalar_tv);
408 
409  uint64_t fragment_offset =
410  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
411  victim_offsets[row_index] = fragment_offset;
412  }
413  return entries_processed;
414  };
415 
416  auto get_row_index = [complete_row_block_size](uint64_t thread_index) -> uint64_t {
417  return thread_index * complete_row_block_size;
418  };
419 
420  RowProcessingFuturesVector row_processing_futures;
421  row_processing_futures.reserve(usable_threads);
422 
423  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
424  row_processing_futures.emplace_back(
425  std::async(std::launch::async,
426  std::forward<decltype(process_rows)>(process_rows),
427  get_row_index(i),
428  complete_row_block_size));
429  }
430  if (partial_row_block_size) {
431  row_processing_futures.emplace_back(
432  std::async(std::launch::async,
433  std::forward<decltype(process_rows)>(process_rows),
434  get_row_index(usable_threads),
435  partial_row_block_size));
436  }
437 
438  uint64_t rows_processed(0);
439  for (auto& t : row_processing_futures) {
440  t.wait();
441  rows_processed += t.get();
442  }
443 
444  IOFacility::deleteColumns(catalog_,
445  update_log.getPhysicalTableId(),
446  update_log.getFragmentId(),
447  victim_offsets,
448  update_log.getColumnType(0),
449  delete_parameters.getTransactionTracker());
450  };
451  return callback;
452 }
typename IOFacility::DeleteVictimOffsetList DeleteVictimOffsetList
FRAGMENT_UPDATER FragmentUpdaterType
int normalized_cpu_threads() const
#define UNLIKELY(x)
Definition: likely.h:20
CatalogType const & catalog_
#define CHECK(condition)
Definition: Logger.h:193

◆ yieldUpdateCallback()

template<typename EXECUTOR_TRAITS , typename IO_FACET , typename FRAGMENT_UPDATER >
StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateCallback StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::yieldUpdateCallback ( UpdateTransactionParameters update_parameters)

Definition at line 204 of file StorageIOFacility.h.

205  {
206  using OffsetVector = std::vector<uint64_t>;
207  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
208  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
209 
210  if (update_parameters.isVarlenUpdateRequired()) {
211  auto callback = [this,
212  &update_parameters](FragmentUpdaterType const& update_log) -> void {
213  std::vector<const ColumnDescriptor*> columnDescriptors;
214  std::vector<TargetMetaInfo> sourceMetaInfos;
215 
216  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size(); idx++) {
217  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
218  auto target_column =
219  catalog_.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
220  columnDescriptors.push_back(target_column);
221  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
222  }
223 
224  auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
225  auto* fragmenter = td->fragmenter;
226  CHECK(fragmenter);
227 
228  fragmenter->updateColumns(
229  &catalog_,
230  td,
231  update_log.getFragmentId(),
232  sourceMetaInfos,
233  columnDescriptors,
234  update_log,
235  update_parameters.getUpdateColumnCount(), // last column of result set
237  update_parameters.getTransactionTracker());
238  };
239  return callback;
240 
241  } else {
242  auto callback = [this,
243  &update_parameters](FragmentUpdaterType const& update_log) -> void {
244  auto entries_per_column = update_log.getEntryCount();
245  auto rows_per_column = update_log.getRowCount();
246  if (rows_per_column == 0) {
247  return;
248  }
249 
250  OffsetVector column_offsets(rows_per_column);
251  ScalarTargetValueVector scalar_target_values(rows_per_column);
252 
253  auto complete_entry_block_size = entries_per_column / normalized_cpu_threads();
254  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
255  auto usable_threads = normalized_cpu_threads();
256  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
257  complete_entry_block_size = entries_per_column;
258  partial_row_block_size = 0;
259  usable_threads = 1;
260  }
261 
262  std::atomic<size_t> row_idx{0};
263 
264  auto process_rows = [&update_log,
265  &update_parameters,
266  &column_offsets,
267  &scalar_target_values,
268  &row_idx](auto type_tag,
269  uint64_t column_index,
270  uint64_t entry_start,
271  uint64_t entry_count) -> uint64_t {
272  uint64_t entries_processed = 0;
273  for (uint64_t entry_index = entry_start;
274  entry_index < (entry_start + entry_count);
275  entry_index++) {
276  constexpr auto get_entry_method_sel(MethodSelector::getEntryAt(type_tag));
277  auto const row((update_log.*get_entry_method_sel)(entry_index));
278 
279  if (row.empty()) {
280  continue;
281  }
282 
283  entries_processed++;
284  size_t row_index = row_idx.fetch_add(1);
285 
286  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
287 
288  auto terminal_column_iter = std::prev(row.end());
289  const auto frag_offset_scalar_tv =
290  boost::get<ScalarTargetValue>(&*terminal_column_iter);
291  CHECK(frag_offset_scalar_tv);
292 
293  column_offsets[row_index] =
294  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
295  scalar_target_values[row_index] =
296  boost::get<ScalarTargetValue>(row[column_index]);
297  }
298  return entries_processed;
299  };
300 
301  auto get_row_index =
302  [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
303  return (thread_index * complete_entry_block_size);
304  };
305 
306  // Iterate over each column
307  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
308  column_index < update_parameters.getUpdateColumnCount();
309  column_index++) {
310  row_idx = 0;
311  RowProcessingFuturesVector entry_processing_futures;
312  entry_processing_futures.reserve(usable_threads);
313 
314  auto thread_launcher = [&](auto const& type_tag) {
315  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
316  entry_processing_futures.emplace_back(
317  std::async(std::launch::async,
318  std::forward<decltype(process_rows)>(process_rows),
319  type_tag,
320  column_index,
321  get_row_index(i),
322  complete_entry_block_size));
323  }
324  if (partial_row_block_size) {
325  entry_processing_futures.emplace_back(
326  std::async(std::launch::async,
327  std::forward<decltype(process_rows)>(process_rows),
328  type_tag,
329  column_index,
330  get_row_index(usable_threads),
331  partial_row_block_size));
332  }
333  };
334 
335  if (!update_log.getColumnType(column_index).is_string()) {
336  thread_launcher(NonStringSelector());
337  } else {
338  thread_launcher(StringSelector());
339  }
340 
341  uint64_t entries_processed(0);
342  for (auto& t : entry_processing_futures) {
343  t.wait();
344  entries_processed += t.get();
345  }
346 
347  CHECK(row_idx == rows_per_column);
348 
349  IOFacility::updateColumn(catalog_,
350  update_log.getPhysicalTableId(),
351  update_parameters.getUpdateColumnNames()[column_index],
352  update_log.getFragmentId(),
353  column_offsets,
354  scalar_target_values,
355  update_log.getColumnType(column_index),
356  update_parameters.getTransactionTracker());
357  }
358  };
359  return callback;
360  }
361 }
FRAGMENT_UPDATER FragmentUpdaterType
Experimental::MetaTypeClass< Experimental::String > StringSelector
int normalized_cpu_threads() const
#define UNLIKELY(x)
Definition: likely.h:20
Experimental::UncapturedMetaTypeClass NonStringSelector
CatalogType const & catalog_
#define CHECK(condition)
Definition: Logger.h:193
static constexpr auto getEntryAt(StringSelector)

Member Data Documentation

◆ catalog_

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
CatalogType const& StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::catalog_
private

Definition at line 199 of file StorageIOFacility.h.

◆ executor_

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
ExecutorType* StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::executor_
private

Definition at line 198 of file StorageIOFacility.h.


The documentation for this class was generated from the following file: