OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER > Class Template Reference

#include <StorageIOFacility.h>

Classes

struct  DeleteTransactionParameters
 
struct  MethodSelector
 
class  TransactionParameters
 
class  UpdateTransactionParameters
 

Public Types

using ExecutorType = typename EXECUTOR_TRAITS::ExecutorType
 
using CatalogType = typename EXECUTOR_TRAITS::CatalogType
 
using FragmentUpdaterType = FRAGMENT_UPDATER
 
using UpdateCallback = typename FragmentUpdaterType::Callback
 
using IOFacility = IO_FACET
 
using TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType
 
using DeleteVictimOffsetList = typename IOFacility::DeleteVictimOffsetList
 
using UpdateTargetOffsetList = typename IOFacility::UpdateTargetOffsetList
 
using UpdateTargetTypeList = typename IOFacility::UpdateTargetTypeList
 
using UpdateTargetColumnNamesList = typename IOFacility::UpdateTargetColumnNamesList
 
using UpdateTargetColumnNameType = typename UpdateTargetColumnNamesList::value_type
 
using ColumnValidationFunction = typename IOFacility::ColumnValidationFunction
 
using StringSelector = Experimental::MetaTypeClass< Experimental::String >
 
using NonStringSelector = Experimental::UncapturedMetaTypeClass
 

Public Member Functions

 StorageIOFacility (ExecutorType *executor, CatalogType const &catalog)
 
ColumnValidationFunction yieldColumnValidator (TableDescriptorType const *table_descriptor)
 
UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Private Member Functions

int normalized_cpu_threads () const
 

Private Attributes

ExecutorTypeexecutor_
 
CatalogType const & catalog_
 

Detailed Description

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
class StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >

Definition at line 102 of file StorageIOFacility.h.

Member Typedef Documentation

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::CatalogType = typename EXECUTOR_TRAITS::CatalogType

Definition at line 105 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::ColumnValidationFunction = typename IOFacility::ColumnValidationFunction

Definition at line 115 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::DeleteVictimOffsetList = typename IOFacility::DeleteVictimOffsetList

Definition at line 110 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::ExecutorType = typename EXECUTOR_TRAITS::ExecutorType

Definition at line 104 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::FragmentUpdaterType = FRAGMENT_UPDATER

Definition at line 106 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::IOFacility = IO_FACET

Definition at line 108 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::NonStringSelector = Experimental::UncapturedMetaTypeClass

Definition at line 118 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::StringSelector = Experimental::MetaTypeClass<Experimental::String>

Definition at line 117 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType

Definition at line 109 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateCallback = typename FragmentUpdaterType::Callback

Definition at line 107 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTargetColumnNamesList = typename IOFacility::UpdateTargetColumnNamesList

Definition at line 113 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTargetColumnNameType = typename UpdateTargetColumnNamesList::value_type

Definition at line 114 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTargetOffsetList = typename IOFacility::UpdateTargetOffsetList

Definition at line 111 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTargetTypeList = typename IOFacility::UpdateTargetTypeList

Definition at line 112 of file StorageIOFacility.h.

Constructor & Destructor Documentation

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::StorageIOFacility ( ExecutorType executor,
CatalogType const &  catalog 
)
inline

Definition at line 179 of file StorageIOFacility.h.

180  : executor_(executor), catalog_(catalog) {}
ExecutorType * executor_
CatalogType const & catalog_

Member Function Documentation

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
int StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::normalized_cpu_threads ( ) const
inlineprivate

Definition at line 191 of file StorageIOFacility.h.

191 { return cpu_threads() / 2; }
int cpu_threads()
Definition: thread_count.h:25
template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
ColumnValidationFunction StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::yieldColumnValidator ( TableDescriptorType const *  table_descriptor)
inline

Definition at line 182 of file StorageIOFacility.h.

183  {
184  return IOFacility::yieldColumnValidator(catalog_, table_descriptor);
185  }
CatalogType const & catalog_
template<typename EXECUTOR_TRAITS , typename IO_FACET , typename FRAGMENT_UPDATER >
StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateCallback StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::yieldDeleteCallback ( DeleteTransactionParameters delete_parameters)

Definition at line 360 of file StorageIOFacility.h.

References catalog_(), CHECK(), StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::TransactionParameters::getTransactionTracker(), and UNLIKELY.

361  {
362  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
363 
364  auto callback = [this,
365  &delete_parameters](FragmentUpdaterType const& update_log) -> void {
366  auto entries_per_column = update_log.getEntryCount();
367  auto rows_per_column = update_log.getRowCount();
368  if (rows_per_column == 0) {
369  return;
370  }
371  DeleteVictimOffsetList victim_offsets(rows_per_column);
372 
373  auto complete_row_block_size = entries_per_column / normalized_cpu_threads();
374  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
375  auto usable_threads = normalized_cpu_threads();
376 
377  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
378  complete_row_block_size = rows_per_column;
379  partial_row_block_size = 0;
380  usable_threads = 1;
381  }
382 
383  std::atomic<size_t> row_idx{0};
384 
385  auto process_rows = [&update_log, &victim_offsets, &row_idx](
386  uint64_t entry_start, uint64_t entry_count) -> uint64_t {
387  uint64_t entries_processed = 0;
388 
389  for (uint64_t entry_index = entry_start; entry_index < (entry_start + entry_count);
390  entry_index++) {
391  auto const row(update_log.getEntryAt(entry_index));
392 
393  if (row.empty()) {
394  continue;
395  }
396 
397  entries_processed++;
398  size_t row_index = row_idx.fetch_add(1);
399 
400  auto terminal_column_iter = std::prev(row.end());
401  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
402  CHECK(scalar_tv);
403 
404  uint64_t fragment_offset =
405  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
406  victim_offsets[row_index] = fragment_offset;
407  }
408  return entries_processed;
409  };
410 
411  auto get_row_index = [complete_row_block_size](uint64_t thread_index) -> uint64_t {
412  return thread_index * complete_row_block_size;
413  };
414 
415  RowProcessingFuturesVector row_processing_futures;
416  row_processing_futures.reserve(usable_threads);
417 
418  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
419  row_processing_futures.emplace_back(
420  std::async(std::launch::async,
421  std::forward<decltype(process_rows)>(process_rows),
422  get_row_index(i),
423  complete_row_block_size));
424  }
425  if (partial_row_block_size) {
426  row_processing_futures.emplace_back(
427  std::async(std::launch::async,
428  std::forward<decltype(process_rows)>(process_rows),
429  get_row_index(usable_threads),
430  partial_row_block_size));
431  }
432 
433  uint64_t rows_processed(0);
434  for (auto& t : row_processing_futures) {
435  t.wait();
436  rows_processed += t.get();
437  }
438 
439  IOFacility::deleteColumns(catalog_,
440  update_log.getPhysicalTableId(),
441  update_log.getFragmentId(),
442  victim_offsets,
443  update_log.getColumnType(0),
444  delete_parameters.getTransactionTracker());
445  };
446  return callback;
447 }
int normalized_cpu_threads() const
typename IOFacility::DeleteVictimOffsetList DeleteVictimOffsetList
FRAGMENT_UPDATER FragmentUpdaterType
CHECK(cgen_state)
#define UNLIKELY(x)
Definition: likely.h:20
CatalogType const & catalog_

+ Here is the call graph for this function:

template<typename EXECUTOR_TRAITS , typename IO_FACET , typename FRAGMENT_UPDATER >
StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateCallback StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::yieldUpdateCallback ( UpdateTransactionParameters update_parameters)

Definition at line 199 of file StorageIOFacility.h.

References catalog_(), CHECK(), Data_Namespace::CPU_LEVEL, StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTransactionParameters::getTargetsMetaInfo(), StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::TransactionParameters::getTransactionTracker(), StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTransactionParameters::getUpdateColumnCount(), StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTransactionParameters::getUpdateColumnNames(), StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::UpdateTransactionParameters::isVarlenUpdateRequired(), and UNLIKELY.

200  {
201  using OffsetVector = std::vector<uint64_t>;
202  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
203  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
204 
205  if (update_parameters.isVarlenUpdateRequired()) {
206  auto callback = [this,
207  &update_parameters](FragmentUpdaterType const& update_log) -> void {
208  std::vector<const ColumnDescriptor*> columnDescriptors;
209  std::vector<TargetMetaInfo> sourceMetaInfos;
210 
211  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size(); idx++) {
212  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
213  auto target_column =
214  catalog_.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
215  columnDescriptors.push_back(target_column);
216  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
217  }
218 
219  auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
220  auto* fragmenter = td->fragmenter;
221  CHECK(fragmenter);
222 
223  fragmenter->updateColumns(
224  &catalog_,
225  td,
226  update_log.getFragmentId(),
227  sourceMetaInfos,
228  columnDescriptors,
229  update_log,
230  update_parameters.getUpdateColumnCount(), // last column of result set
232  update_parameters.getTransactionTracker());
233  };
234  return callback;
235 
236  } else {
237  auto callback = [this,
238  &update_parameters](FragmentUpdaterType const& update_log) -> void {
239  auto entries_per_column = update_log.getEntryCount();
240  auto rows_per_column = update_log.getRowCount();
241  if (rows_per_column == 0) {
242  return;
243  }
244 
245  OffsetVector column_offsets(rows_per_column);
246  ScalarTargetValueVector scalar_target_values(rows_per_column);
247 
248  auto complete_entry_block_size = entries_per_column / normalized_cpu_threads();
249  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
250  auto usable_threads = normalized_cpu_threads();
251  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
252  complete_entry_block_size = entries_per_column;
253  partial_row_block_size = 0;
254  usable_threads = 1;
255  }
256 
257  std::atomic<size_t> row_idx{0};
258 
259  auto process_rows = [&update_log,
260  &update_parameters,
261  &column_offsets,
262  &scalar_target_values,
263  &row_idx](auto type_tag,
264  uint64_t column_index,
265  uint64_t entry_start,
266  uint64_t entry_count) -> uint64_t {
267  uint64_t entries_processed = 0;
268  for (uint64_t entry_index = entry_start;
269  entry_index < (entry_start + entry_count);
270  entry_index++) {
271  constexpr auto get_entry_method_sel(MethodSelector::getEntryAt(type_tag));
272  auto const row((update_log.*get_entry_method_sel)(entry_index));
273 
274  if (row.empty()) {
275  continue;
276  }
277 
278  entries_processed++;
279  size_t row_index = row_idx.fetch_add(1);
280 
281  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
282 
283  auto terminal_column_iter = std::prev(row.end());
284  const auto frag_offset_scalar_tv =
285  boost::get<ScalarTargetValue>(&*terminal_column_iter);
286  CHECK(frag_offset_scalar_tv);
287 
288  column_offsets[row_index] =
289  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
290  scalar_target_values[row_index] =
291  boost::get<ScalarTargetValue>(row[column_index]);
292  }
293  return entries_processed;
294  };
295 
296  auto get_row_index =
297  [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
298  return (thread_index * complete_entry_block_size);
299  };
300 
301  // Iterate over each column
302  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
303  column_index < update_parameters.getUpdateColumnCount();
304  column_index++) {
305  row_idx = 0;
306  RowProcessingFuturesVector entry_processing_futures;
307  entry_processing_futures.reserve(usable_threads);
308 
309  auto thread_launcher = [&](auto const& type_tag) {
310  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
311  entry_processing_futures.emplace_back(
312  std::async(std::launch::async,
313  std::forward<decltype(process_rows)>(process_rows),
314  type_tag,
315  column_index,
316  get_row_index(i),
317  complete_entry_block_size));
318  }
319  if (partial_row_block_size) {
320  entry_processing_futures.emplace_back(
321  std::async(std::launch::async,
322  std::forward<decltype(process_rows)>(process_rows),
323  type_tag,
324  column_index,
325  get_row_index(usable_threads),
326  partial_row_block_size));
327  }
328  };
329 
330  if (!update_log.getColumnType(column_index).is_string()) {
331  thread_launcher(NonStringSelector());
332  } else {
333  thread_launcher(StringSelector());
334  }
335 
336  uint64_t entries_processed(0);
337  for (auto& t : entry_processing_futures) {
338  t.wait();
339  entries_processed += t.get();
340  }
341 
342  CHECK(row_idx == rows_per_column);
343 
344  IOFacility::updateColumn(catalog_,
345  update_log.getPhysicalTableId(),
346  update_parameters.getUpdateColumnNames()[column_index],
347  update_log.getFragmentId(),
348  column_offsets,
349  scalar_target_values,
350  update_log.getColumnType(column_index),
351  update_parameters.getTransactionTracker());
352  }
353  };
354  return callback;
355  }
356 }
int normalized_cpu_threads() const
FRAGMENT_UPDATER FragmentUpdaterType
Experimental::MetaTypeClass< Experimental::String > StringSelector
CHECK(cgen_state)
#define UNLIKELY(x)
Definition: likely.h:20
Experimental::UncapturedMetaTypeClass NonStringSelector
CatalogType const & catalog_
static constexpr auto getEntryAt(StringSelector)

+ Here is the call graph for this function:

Member Data Documentation

template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
CatalogType const& StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::catalog_
private
template<typename EXECUTOR_TRAITS, typename IO_FACET = DefaultIOFacet<>, typename FRAGMENT_UPDATER = UpdateLogForFragment>
ExecutorType* StorageIOFacility< EXECUTOR_TRAITS, IO_FACET, FRAGMENT_UPDATER >::executor_
private

Definition at line 193 of file StorageIOFacility.h.


The documentation for this class was generated from the following file: