OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER > Class Template Reference

#include <StorageIOFacility.h>

Classes

struct  DeleteTransactionParameters
 
class  TransactionParameters
 
class  UpdateTransactionParameters
 

Public Types

using ExecutorType = typename EXECUTOR_TRAITS::ExecutorType
 
using CatalogType = typename EXECUTOR_TRAITS::CatalogType
 
using FragmentUpdaterType = FRAGMENT_UPDATER
 
using UpdateCallback = typename FragmentUpdaterType::Callback
 
using TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using FragmenterType = Fragmenter_Namespace::InsertOrderFragmenter
 
using TransactionLog = typename FragmenterType::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Public Member Functions

 StorageIOFacility (ExecutorType *executor, CatalogType const &catalog)
 
UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Private Member Functions

int normalized_cpu_threads () const
 

Private Attributes

ExecutorTypeexecutor_
 
CatalogType const & catalog_
 

Detailed Description

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
class StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >

Definition at line 32 of file StorageIOFacility.h.

Member Typedef Documentation

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::CatalogType = typename EXECUTOR_TRAITS::CatalogType

Definition at line 35 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::ColumnValidationFunction = std::function<bool(std::string const&)>

Definition at line 48 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::DeleteVictimOffsetList = std::vector<uint64_t>

Definition at line 40 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::ExecutorType = typename EXECUTOR_TRAITS::ExecutorType

Definition at line 34 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::FragmenterType = Fragmenter_Namespace::InsertOrderFragmenter

Definition at line 45 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::FragmentUpdaterType = FRAGMENT_UPDATER

Definition at line 36 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType

Definition at line 39 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::TransactionLog = typename FragmenterType::ModifyTransactionTracker

Definition at line 46 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::TransactionLogPtr = std::unique_ptr<TransactionLog>

Definition at line 47 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateCallback = typename FragmentUpdaterType::Callback

Definition at line 37 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTargetColumnNamesList = std::vector<std::string>

Definition at line 43 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTargetOffsetList = std::vector<uint64_t>

Definition at line 41 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTargetTypeList = std::vector<TargetMetaInfo>

Definition at line 42 of file StorageIOFacility.h.

Constructor & Destructor Documentation

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::StorageIOFacility ( ExecutorType executor,
CatalogType const &  catalog 
)
inline

Definition at line 108 of file StorageIOFacility.h.

109  : executor_(executor), catalog_(catalog) {}
CatalogType const & catalog_
ExecutorType * executor_

Member Function Documentation

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
int StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::normalized_cpu_threads ( ) const
inlineprivate

Definition at line 115 of file StorageIOFacility.h.

115 { return cpu_threads() / 2; }
int cpu_threads()
Definition: thread_count.h:25
template<typename EXECUTOR_TRAITS , typename FRAGMENT_UPDATER >
StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateCallback StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::yieldDeleteCallback ( DeleteTransactionParameters delete_parameters)

Definition at line 394 of file StorageIOFacility.h.

References catalog_(), CHECK(), CHECK_EQ, Data_Namespace::CPU_LEVEL, Chunk_NS::Chunk::getChunk(), StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::TransactionParameters::getTransactionTracker(), Data_Namespace::GPU_LEVEL, table_is_temporary(), StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::DeleteTransactionParameters::tableIsTemporary(), and UNLIKELY.

395  {
396  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
397 
398  if (delete_parameters.tableIsTemporary()) {
399  auto callback = [this](FragmentUpdaterType const& update_log) -> void {
400  auto rs = update_log.getResultSet();
401  CHECK(rs->didOutputColumnar());
402  CHECK(rs->isDirectColumnarConversionPossible());
403  CHECK_EQ(rs->colCount(), size_t(1));
404 
405  // Temporary table updates require the full projected column
406  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
407 
408  auto& fragment_info = update_log.getFragmentInfo();
409  const auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
410  CHECK(td);
411  const auto cd = catalog_.getDeletedColumn(td);
412  CHECK(cd);
413  auto chunk_metadata =
414  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
415  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
416  ChunkKey chunk_key{catalog_.getCurrentDB().dbId,
417  td->tableId,
418  cd->columnId,
419  fragment_info.fragmentId};
420  auto chunk = Chunk_NS::Chunk::getChunk(cd,
421  &catalog_.getDataMgr(),
422  chunk_key,
424  0,
425  chunk_metadata->second.numBytes,
426  chunk_metadata->second.numElements);
427  CHECK(chunk);
428  auto chunk_buffer = chunk->get_buffer();
429  CHECK(chunk_buffer && chunk_buffer->has_encoder);
430 
431  auto encoder = chunk_buffer->encoder.get();
432  CHECK(encoder);
433 
434  const auto bytes_width = rs->getPaddedSlotWidthBytes(0);
435 
436  // leverage the encoder to scale column values if the type is encoded (e.g.
437  // DateInDays)
438  const size_t buffer_size = bytes_width * update_log.getRowCount();
439  auto updates_buffer_owned = std::make_unique<char[]>(buffer_size);
440  auto updates_buffer = reinterpret_cast<int8_t*>(updates_buffer_owned.get());
441  rs->copyColumnIntoBuffer(0, updates_buffer, buffer_size);
442 
443  const auto new_chunk_metadata =
444  encoder->appendData(updates_buffer, rs->rowCount(), cd->columnType, false, 0);
445 
446  auto fragmenter = td->fragmenter;
447  CHECK(fragmenter);
448 
449  // The fragmenter copy of the fragment info differs from the copy used by the query
450  // engine. Update metadata in the fragmenter directly.
451  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
452  // TODO: we may want to put this directly in the fragmenter so we are under the
453  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
454  // allowed in this path.
455 
456  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
457  fragment->shadowChunkMetadataMap =
458  fragment->getChunkMetadataMap(); // TODO(adb): needed?
459 
460  auto& data_mgr = catalog_.getDataMgr();
461  if (data_mgr.gpusPresent()) {
462  // flush any GPU copies of the updated chunk
463  data_mgr.deleteChunksWithPrefix(chunk_key,
465  }
466  };
467  return callback;
468  } else {
469  auto callback = [this,
470  &delete_parameters](FragmentUpdaterType const& update_log) -> void {
471  auto entries_per_column = update_log.getEntryCount();
472  auto rows_per_column = update_log.getRowCount();
473  if (rows_per_column == 0) {
474  return;
475  }
476  DeleteVictimOffsetList victim_offsets(rows_per_column);
477 
478  auto complete_row_block_size = entries_per_column / normalized_cpu_threads();
479  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
480  auto usable_threads = normalized_cpu_threads();
481 
482  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
483  complete_row_block_size = rows_per_column;
484  partial_row_block_size = 0;
485  usable_threads = 1;
486  }
487 
488  std::atomic<size_t> row_idx{0};
489 
490  auto process_rows = [&update_log, &victim_offsets, &row_idx](
491  uint64_t entry_start, uint64_t entry_count) -> uint64_t {
492  uint64_t entries_processed = 0;
493 
494  for (uint64_t entry_index = entry_start;
495  entry_index < (entry_start + entry_count);
496  entry_index++) {
497  auto const row(update_log.getEntryAt(entry_index));
498 
499  if (row.empty()) {
500  continue;
501  }
502 
503  entries_processed++;
504  size_t row_index = row_idx.fetch_add(1);
505 
506  auto terminal_column_iter = std::prev(row.end());
507  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
508  CHECK(scalar_tv);
509 
510  uint64_t fragment_offset =
511  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
512  victim_offsets[row_index] = fragment_offset;
513  }
514  return entries_processed;
515  };
516 
517  auto get_row_index = [complete_row_block_size](uint64_t thread_index) -> uint64_t {
518  return thread_index * complete_row_block_size;
519  };
520 
521  RowProcessingFuturesVector row_processing_futures;
522  row_processing_futures.reserve(usable_threads);
523 
524  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
525  row_processing_futures.emplace_back(
526  std::async(std::launch::async,
527  std::forward<decltype(process_rows)>(process_rows),
528  get_row_index(i),
529  complete_row_block_size));
530  }
531  if (partial_row_block_size) {
532  row_processing_futures.emplace_back(
533  std::async(std::launch::async,
534  std::forward<decltype(process_rows)>(process_rows),
535  get_row_index(usable_threads),
536  partial_row_block_size));
537  }
538 
539  uint64_t rows_processed(0);
540  for (auto& t : row_processing_futures) {
541  t.wait();
542  rows_processed += t.get();
543  }
544 
545  auto const* table_descriptor =
546  catalog_.getMetadataForTable(update_log.getPhysicalTableId());
547  CHECK(!table_is_temporary(table_descriptor));
548  auto* fragmenter = table_descriptor->fragmenter;
549  CHECK(fragmenter);
550 
551  auto const* deleted_column_desc = catalog_.getDeletedColumn(table_descriptor);
552  CHECK(deleted_column_desc);
553  fragmenter->updateColumn(&catalog_,
554  table_descriptor,
555  deleted_column_desc,
556  update_log.getFragmentId(),
557  victim_offsets,
558  ScalarTargetValue(int64_t(1L)),
559  update_log.getColumnType(0),
561  delete_parameters.getTransactionTracker());
562  };
563  return callback;
564  }
565 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:35
CatalogType const & catalog_
CHECK(cgen_state)
int normalized_cpu_threads() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define UNLIKELY(x)
Definition: likely.h:20
FRAGMENT_UPDATER FragmentUpdaterType
bool table_is_temporary(const TableDescriptor *const td)
std::vector< uint64_t > DeleteVictimOffsetList
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

template<typename EXECUTOR_TRAITS , typename FRAGMENT_UPDATER >
StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateCallback StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::yieldUpdateCallback ( UpdateTransactionParameters update_parameters)

Definition at line 123 of file StorageIOFacility.h.

References catalog_(), CHECK(), CHECK_EQ, Data_Namespace::CPU_LEVEL, Chunk_NS::Chunk::getChunk(), StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTransactionParameters::getTargetsMetaInfo(), StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::TransactionParameters::getTransactionTracker(), StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTransactionParameters::getUpdateColumnCount(), StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTransactionParameters::getUpdateColumnNames(), Data_Namespace::GPU_LEVEL, StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTransactionParameters::isVarlenUpdateRequired(), StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTransactionParameters::tableIsTemporary(), and UNLIKELY.

124  {
125  using OffsetVector = std::vector<uint64_t>;
126  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
127  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
128 
129  if (update_parameters.isVarlenUpdateRequired()) {
130  auto callback = [this,
131  &update_parameters](FragmentUpdaterType const& update_log) -> void {
132  std::vector<const ColumnDescriptor*> columnDescriptors;
133  std::vector<TargetMetaInfo> sourceMetaInfos;
134 
135  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size(); idx++) {
136  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
137  auto target_column =
138  catalog_.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
139  columnDescriptors.push_back(target_column);
140  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
141  }
142 
143  auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
144  auto* fragmenter = td->fragmenter;
145  CHECK(fragmenter);
146 
147  fragmenter->updateColumns(
148  &catalog_,
149  td,
150  update_log.getFragmentId(),
151  sourceMetaInfos,
152  columnDescriptors,
153  update_log,
154  update_parameters.getUpdateColumnCount(), // last column of result set
156  update_parameters.getTransactionTracker());
157  };
158  return callback;
159  } else if (update_parameters.tableIsTemporary()) {
160  auto callback = [this,
161  &update_parameters](FragmentUpdaterType const& update_log) -> void {
162  auto rs = update_log.getResultSet();
163  CHECK(rs->didOutputColumnar());
164  CHECK(rs->isDirectColumnarConversionPossible());
165  CHECK_EQ(update_parameters.getUpdateColumnCount(), size_t(1));
166  CHECK_EQ(rs->colCount(), size_t(1));
167 
168  // Temporary table updates require the full projected column
169  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
170 
171  auto& fragment_info = update_log.getFragmentInfo();
172  const auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
173  CHECK(td);
174  const auto cd = catalog_.getMetadataForColumn(
175  td->tableId, update_parameters.getUpdateColumnNames().front());
176  ;
177  CHECK(cd);
178  auto chunk_metadata =
179  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
180  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
181  ChunkKey chunk_key{catalog_.getCurrentDB().dbId,
182  td->tableId,
183  cd->columnId,
184  fragment_info.fragmentId};
185  auto chunk = Chunk_NS::Chunk::getChunk(cd,
186  &catalog_.getDataMgr(),
187  chunk_key,
189  0,
190  chunk_metadata->second.numBytes,
191  chunk_metadata->second.numElements);
192  CHECK(chunk);
193  auto chunk_buffer = chunk->get_buffer();
194  CHECK(chunk_buffer && chunk_buffer->has_encoder);
195 
196  auto encoder = chunk_buffer->encoder.get();
197  CHECK(encoder);
198 
199  const auto bytes_width = rs->getPaddedSlotWidthBytes(0);
200 
201  ChunkMetadata new_chunk_metadata;
202  if (cd->columnType.is_dict_encoded_string() &&
203  cd->columnType.get_size() < bytes_width) {
204  // dictionary encoded strings currently use the none-encoder for all types. Scale
205  // the column values appropriately
206 
207  const auto col_bytes_width = cd->columnType.get_size();
208  const size_t buffer_size = col_bytes_width * update_log.getRowCount();
209  auto updates_buffer_owned = std::make_unique<char[]>(buffer_size);
210  auto updates_buffer = reinterpret_cast<int8_t*>(updates_buffer_owned.get());
211 
212  auto rs_buffer_size = bytes_width * update_log.getRowCount();
213  auto rs_buffer_owned = std::make_unique<char[]>(rs_buffer_size);
214  auto rs_buffer = reinterpret_cast<int8_t*>(rs_buffer_owned.get());
215  rs->copyColumnIntoBuffer(0, rs_buffer, rs_buffer_size);
216 
217  // Iterate the result set and copy into the updates buffer
218  auto updates_ptr = updates_buffer;
219  auto rs_ptr = rs_buffer;
220  for (size_t i = 0; i < rs->rowCount(); i++) {
221  std::memcpy(updates_ptr, rs_ptr, col_bytes_width);
222  updates_ptr += col_bytes_width;
223  rs_ptr += bytes_width;
224  }
225 
226  new_chunk_metadata =
227  encoder->appendData(updates_buffer, rs->rowCount(), cd->columnType, false, 0);
228  } else {
229  // leverage the encoder to scale column values if the type is encoded (e.g.
230  // DateInDays)
231  const size_t buffer_size = bytes_width * update_log.getRowCount();
232  auto updates_buffer_owned = std::make_unique<char[]>(buffer_size);
233  auto updates_buffer = reinterpret_cast<int8_t*>(updates_buffer_owned.get());
234  rs->copyColumnIntoBuffer(0, updates_buffer, buffer_size);
235 
236  new_chunk_metadata =
237  encoder->appendData(updates_buffer, rs->rowCount(), cd->columnType, false, 0);
238  }
239 
240  auto fragmenter = td->fragmenter;
241  CHECK(fragmenter);
242 
243  // The fragmenter copy of the fragment info differs from the copy used by the query
244  // engine. Update metadata in the fragmenter directly.
245  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
246  // TODO: we may want to put this directly in the fragmenter so we are under the
247  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
248  // allowed in this path.
249 
250  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
251  fragment->shadowChunkMetadataMap =
252  fragment->getChunkMetadataMap(); // TODO(adb): needed?
253 
254  auto& data_mgr = catalog_.getDataMgr();
255  if (data_mgr.gpusPresent()) {
256  // flush any GPU copies of the updated chunk
257  data_mgr.deleteChunksWithPrefix(chunk_key,
259  }
260  };
261  return callback;
262  } else {
263  auto callback = [this,
264  &update_parameters](FragmentUpdaterType const& update_log) -> void {
265  auto entries_per_column = update_log.getEntryCount();
266  auto rows_per_column = update_log.getRowCount();
267  if (rows_per_column == 0) {
268  return;
269  }
270 
271  OffsetVector column_offsets(rows_per_column);
272  ScalarTargetValueVector scalar_target_values(rows_per_column);
273 
274  auto complete_entry_block_size = entries_per_column / normalized_cpu_threads();
275  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
276  auto usable_threads = normalized_cpu_threads();
277  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
278  complete_entry_block_size = entries_per_column;
279  partial_row_block_size = 0;
280  usable_threads = 1;
281  }
282 
283  std::atomic<size_t> row_idx{0};
284 
285  auto process_rows = [&update_log,
286  &update_parameters,
287  &column_offsets,
288  &scalar_target_values,
289  &row_idx](auto get_entry_at_func,
290  uint64_t column_index,
291  uint64_t entry_start,
292  uint64_t entry_count) -> uint64_t {
293  uint64_t entries_processed = 0;
294  for (uint64_t entry_index = entry_start;
295  entry_index < (entry_start + entry_count);
296  entry_index++) {
297  const auto& row = get_entry_at_func(entry_index);
298  if (row.empty()) {
299  continue;
300  }
301 
302  entries_processed++;
303  size_t row_index = row_idx.fetch_add(1);
304 
305  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
306 
307  auto terminal_column_iter = std::prev(row.end());
308  const auto frag_offset_scalar_tv =
309  boost::get<ScalarTargetValue>(&*terminal_column_iter);
310  CHECK(frag_offset_scalar_tv);
311 
312  column_offsets[row_index] =
313  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
314  scalar_target_values[row_index] =
315  boost::get<ScalarTargetValue>(row[column_index]);
316  }
317  return entries_processed;
318  };
319 
320  auto get_row_index =
321  [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
322  return (thread_index * complete_entry_block_size);
323  };
324 
325  // Iterate over each column
326  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
327  column_index < update_parameters.getUpdateColumnCount();
328  column_index++) {
329  row_idx = 0;
330  RowProcessingFuturesVector entry_processing_futures;
331  entry_processing_futures.reserve(usable_threads);
332 
333  auto get_entry_at_func = [&update_log, &column_index](const size_t entry_index) {
334  if (UNLIKELY(update_log.getColumnType(column_index).is_string())) {
335  return update_log.getTranslatedEntryAt(entry_index);
336  } else {
337  return update_log.getEntryAt(entry_index);
338  }
339  };
340 
341  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
342  entry_processing_futures.emplace_back(
343  std::async(std::launch::async,
344  std::forward<decltype(process_rows)>(process_rows),
345  get_entry_at_func,
346  column_index,
347  get_row_index(i),
348  complete_entry_block_size));
349  }
350  if (partial_row_block_size) {
351  entry_processing_futures.emplace_back(
352  std::async(std::launch::async,
353  std::forward<decltype(process_rows)>(process_rows),
354  get_entry_at_func,
355  column_index,
356  get_row_index(usable_threads),
357  partial_row_block_size));
358  }
359 
360  uint64_t entries_processed(0);
361  for (auto& t : entry_processing_futures) {
362  t.wait();
363  entries_processed += t.get();
364  }
365 
366  CHECK(row_idx == rows_per_column);
367 
368  const auto table_id = update_log.getPhysicalTableId();
369  auto const* table_descriptor =
370  catalog_.getMetadataForTable(update_log.getPhysicalTableId());
371  CHECK(table_descriptor);
372  const auto fragmenter = table_descriptor->fragmenter;
373  CHECK(fragmenter);
374  auto const* target_column = catalog_.getMetadataForColumn(
375  table_id, update_parameters.getUpdateColumnNames()[column_index]);
376 
377  fragmenter->updateColumn(&catalog_,
378  table_descriptor,
379  target_column,
380  update_log.getFragmentId(),
381  column_offsets,
382  scalar_target_values,
383  update_log.getColumnType(column_index),
385  update_parameters.getTransactionTracker());
386  }
387  };
388  return callback;
389  }
390 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:35
CatalogType const & catalog_
CHECK(cgen_state)
int normalized_cpu_threads() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define UNLIKELY(x)
Definition: likely.h:20
FRAGMENT_UPDATER FragmentUpdaterType

+ Here is the call graph for this function:

Member Data Documentation

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
CatalogType const& StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::catalog_
private

Definition at line 118 of file StorageIOFacility.h.

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
ExecutorType* StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::executor_
private

Definition at line 117 of file StorageIOFacility.h.


The documentation for this class was generated from the following file: