OmniSciDB  8a228a1076
StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER > Class Template Reference

#include <StorageIOFacility.h>

Classes

struct  DeleteTransactionParameters
 
class  TransactionParameters
 
class  UpdateTransactionParameters
 

Public Types

using ExecutorType = typename EXECUTOR_TRAITS::ExecutorType
 
using CatalogType = typename EXECUTOR_TRAITS::CatalogType
 
using FragmentUpdaterType = FRAGMENT_UPDATER
 
using UpdateCallback = typename FragmentUpdaterType::Callback
 
using TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using FragmenterType = Fragmenter_Namespace::InsertOrderFragmenter
 
using TransactionLog = typename FragmenterType::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Public Member Functions

 StorageIOFacility (ExecutorType *executor, CatalogType const &catalog)
 
UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Private Member Functions

int normalized_cpu_threads () const
 

Static Private Member Functions

static std::unique_ptr< int8_t[]> getRsBufferNoPadding (const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
 

Private Attributes

ExecutorTypeexecutor_
 
CatalogType const & catalog_
 

Detailed Description

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
class StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >

Definition at line 31 of file StorageIOFacility.h.

Member Typedef Documentation

◆ CatalogType

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::CatalogType = typename EXECUTOR_TRAITS::CatalogType

Definition at line 34 of file StorageIOFacility.h.

◆ ColumnValidationFunction

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::ColumnValidationFunction = std::function<bool(std::string const&)>

Definition at line 47 of file StorageIOFacility.h.

◆ DeleteVictimOffsetList

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::DeleteVictimOffsetList = std::vector<uint64_t>

Definition at line 39 of file StorageIOFacility.h.

◆ ExecutorType

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::ExecutorType = typename EXECUTOR_TRAITS::ExecutorType

Definition at line 33 of file StorageIOFacility.h.

◆ FragmenterType

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::FragmenterType = Fragmenter_Namespace::InsertOrderFragmenter

Definition at line 44 of file StorageIOFacility.h.

◆ FragmentUpdaterType

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::FragmentUpdaterType = FRAGMENT_UPDATER

Definition at line 35 of file StorageIOFacility.h.

◆ TableDescriptorType

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType

Definition at line 38 of file StorageIOFacility.h.

◆ TransactionLog

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::TransactionLog = typename FragmenterType::ModifyTransactionTracker

Definition at line 45 of file StorageIOFacility.h.

◆ TransactionLogPtr

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::TransactionLogPtr = std::unique_ptr<TransactionLog>

Definition at line 46 of file StorageIOFacility.h.

◆ UpdateCallback

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateCallback = typename FragmentUpdaterType::Callback

Definition at line 36 of file StorageIOFacility.h.

◆ UpdateTargetColumnNamesList

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTargetColumnNamesList = std::vector<std::string>

Definition at line 42 of file StorageIOFacility.h.

◆ UpdateTargetOffsetList

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTargetOffsetList = std::vector<uint64_t>

Definition at line 40 of file StorageIOFacility.h.

◆ UpdateTargetTypeList

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
using StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateTargetTypeList = std::vector<TargetMetaInfo>

Definition at line 41 of file StorageIOFacility.h.

Constructor & Destructor Documentation

◆ StorageIOFacility()

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::StorageIOFacility ( ExecutorType executor,
CatalogType const &  catalog 
)
inline

Definition at line 107 of file StorageIOFacility.h.

108  : executor_(executor), catalog_(catalog) {}
CatalogType const & catalog_
ExecutorType * executor_

Member Function Documentation

◆ getRsBufferNoPadding()

template<typename EXECUTOR_TRAITS , typename FRAGMENT_UPDATER >
std::unique_ptr< int8_t[]> StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::getRsBufferNoPadding ( const ResultSet rs,
size_t  col_idx,
const SQLTypeInfo column_type,
size_t  row_count 
)
staticprivate

Definition at line 534 of file StorageIOFacility.h.

Referenced by StorageIOFacility< RelAlgExecutorTraits >::normalized_cpu_threads(), StorageIOFacility< RelAlgExecutorTraits >::yieldDeleteCallback(), and StorageIOFacility< RelAlgExecutorTraits >::yieldUpdateCallback().

538  {
539  const auto padded_size = rs->getPaddedSlotWidthBytes(col_idx);
540  const auto type_size = column_type.is_dict_encoded_string()
541  ? column_type.get_size()
542  : column_type.get_logical_size();
543 
544  auto rs_buffer_size = padded_size * row_count;
545  auto rs_buffer = std::make_unique<int8_t[]>(rs_buffer_size);
546  rs->copyColumnIntoBuffer(col_idx, rs_buffer.get(), rs_buffer_size);
547 
548  if (type_size < padded_size) {
549  // else we're going to remove padding and we do it inplace in the same buffer
550  // we can do updates inplace in the same buffer because type_size < padded_size
551  // for some types, like kFLOAT, simple memcpy is not enough
552  auto src_ptr = rs_buffer.get();
553  auto dst_ptr = rs_buffer.get();
554  if (column_type.is_fp()) {
555  CHECK(column_type.get_type() == kFLOAT);
556  CHECK(padded_size == sizeof(double));
557  for (size_t i = 0; i < row_count; i++) {
558  const auto old_val = *reinterpret_cast<double*>(may_alias_ptr(src_ptr));
559  auto new_val = static_cast<float>(old_val);
560  std::memcpy(dst_ptr, &new_val, type_size);
561  dst_ptr += type_size;
562  src_ptr += padded_size;
563  }
564  } else {
565  // otherwise just take first type_size bytes from the padded value
566  for (size_t i = 0; i < row_count; i++) {
567  std::memcpy(dst_ptr, src_ptr, type_size);
568  dst_ptr += type_size;
569  src_ptr += padded_size;
570  }
571  }
572  }
573  return rs_buffer;
574 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
int get_logical_size() const
Definition: sqltypes.h:270
bool is_dict_encoded_string() const
Definition: sqltypes.h:443
#define CHECK(condition)
Definition: Logger.h:197
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
bool is_fp() const
Definition: sqltypes.h:420
+ Here is the caller graph for this function:

◆ normalized_cpu_threads()

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
int StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::normalized_cpu_threads ( ) const
inlineprivate

Definition at line 114 of file StorageIOFacility.h.

Referenced by StorageIOFacility< RelAlgExecutorTraits >::yieldDeleteCallback(), and StorageIOFacility< RelAlgExecutorTraits >::yieldUpdateCallback().

114 { return cpu_threads() / 2; }
int cpu_threads()
Definition: thread_count.h:24
+ Here is the caller graph for this function:

◆ yieldDeleteCallback()

template<typename EXECUTOR_TRAITS , typename FRAGMENT_UPDATER >
StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateCallback StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::yieldDeleteCallback ( DeleteTransactionParameters delete_parameters)

Definition at line 362 of file StorageIOFacility.h.

Referenced by StorageIOFacility< RelAlgExecutorTraits >::StorageIOFacility().

363  {
364  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
365 
366  if (delete_parameters.tableIsTemporary()) {
367  auto callback = [this](FragmentUpdaterType const& update_log) -> void {
368  auto rs = update_log.getResultSet();
369  CHECK(rs->didOutputColumnar());
370  CHECK(rs->isDirectColumnarConversionPossible());
371  CHECK_EQ(rs->colCount(), size_t(1));
372 
373  // Temporary table updates require the full projected column
374  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
375 
376  auto& fragment_info = update_log.getFragmentInfo();
377  const auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
378  CHECK(td);
379  const auto cd = catalog_.getDeletedColumn(td);
380  CHECK(cd);
381  CHECK(cd->columnType.get_type() == kBOOLEAN);
382  auto chunk_metadata =
383  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
384  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
385  ChunkKey chunk_key{catalog_.getCurrentDB().dbId,
386  td->tableId,
387  cd->columnId,
388  fragment_info.fragmentId};
389  auto chunk = Chunk_NS::Chunk::getChunk(cd,
390  &catalog_.getDataMgr(),
391  chunk_key,
393  0,
394  chunk_metadata->second->numBytes,
395  chunk_metadata->second->numElements);
396  CHECK(chunk);
397  auto chunk_buffer = chunk->getBuffer();
398  CHECK(chunk_buffer && chunk_buffer->has_encoder);
399 
400  auto encoder = chunk_buffer->encoder.get();
401  CHECK(encoder);
402 
403  auto owned_buffer =
405  rs.get(), 0, cd->columnType, rs->rowCount());
406  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
407 
408  const auto new_chunk_metadata =
409  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
410 
411  auto fragmenter = td->fragmenter.get();
412  CHECK(fragmenter);
413 
414  // The fragmenter copy of the fragment info differs from the copy used by the query
415  // engine. Update metadata in the fragmenter directly.
416  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
417  // TODO: we may want to put this directly in the fragmenter so we are under the
418  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
419  // allowed in this path.
420 
421  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
422  fragment->shadowChunkMetadataMap =
423  fragment->getChunkMetadataMap(); // TODO(adb): needed?
424 
425  auto& data_mgr = catalog_.getDataMgr();
426  if (data_mgr.gpusPresent()) {
427  // flush any GPU copies of the updated chunk
428  data_mgr.deleteChunksWithPrefix(chunk_key,
430  }
431  };
432  return callback;
433  } else {
434  auto callback = [this,
435  &delete_parameters](FragmentUpdaterType const& update_log) -> void {
436  auto entries_per_column = update_log.getEntryCount();
437  auto rows_per_column = update_log.getRowCount();
438  if (rows_per_column == 0) {
439  return;
440  }
441  DeleteVictimOffsetList victim_offsets(rows_per_column);
442 
443  auto complete_row_block_size = entries_per_column / normalized_cpu_threads();
444  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
445  auto usable_threads = normalized_cpu_threads();
446 
447  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
448  complete_row_block_size = rows_per_column;
449  partial_row_block_size = 0;
450  usable_threads = 1;
451  }
452 
453  std::atomic<size_t> row_idx{0};
454 
455  auto process_rows = [&update_log, &victim_offsets, &row_idx](
456  uint64_t entry_start, uint64_t entry_count) -> uint64_t {
457  uint64_t entries_processed = 0;
458 
459  for (uint64_t entry_index = entry_start;
460  entry_index < (entry_start + entry_count);
461  entry_index++) {
462  auto const row(update_log.getEntryAt(entry_index));
463 
464  if (row.empty()) {
465  continue;
466  }
467 
468  entries_processed++;
469  size_t row_index = row_idx.fetch_add(1);
470 
471  auto terminal_column_iter = std::prev(row.end());
472  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
473  CHECK(scalar_tv);
474 
475  uint64_t fragment_offset =
476  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
477  victim_offsets[row_index] = fragment_offset;
478  }
479  return entries_processed;
480  };
481 
482  auto get_row_index = [complete_row_block_size](uint64_t thread_index) -> uint64_t {
483  return thread_index * complete_row_block_size;
484  };
485 
486  RowProcessingFuturesVector row_processing_futures;
487  row_processing_futures.reserve(usable_threads);
488 
489  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
490  row_processing_futures.emplace_back(
491  std::async(std::launch::async,
492  std::forward<decltype(process_rows)>(process_rows),
493  get_row_index(i),
494  complete_row_block_size));
495  }
496  if (partial_row_block_size) {
497  row_processing_futures.emplace_back(
498  std::async(std::launch::async,
499  std::forward<decltype(process_rows)>(process_rows),
500  get_row_index(usable_threads),
501  partial_row_block_size));
502  }
503 
504  uint64_t rows_processed(0);
505  for (auto& t : row_processing_futures) {
506  t.wait();
507  rows_processed += t.get();
508  }
509 
510  auto const* table_descriptor =
511  catalog_.getMetadataForTable(update_log.getPhysicalTableId());
512  CHECK(!table_is_temporary(table_descriptor));
513  auto* fragmenter = table_descriptor->fragmenter.get();
514  CHECK(fragmenter);
515 
516  auto const* deleted_column_desc = catalog_.getDeletedColumn(table_descriptor);
517  CHECK(deleted_column_desc);
518  fragmenter->updateColumn(&catalog_,
519  table_descriptor,
520  deleted_column_desc,
521  update_log.getFragmentId(),
522  victim_offsets,
523  ScalarTargetValue(int64_t(1L)),
524  update_log.getColumnType(0),
526  delete_parameters.getTransactionTracker());
527  };
528  return callback;
529  }
530 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
CatalogType const & catalog_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define UNLIKELY(x)
Definition: likely.h:20
FRAGMENT_UPDATER FragmentUpdaterType
bool table_is_temporary(const TableDescriptor *const td)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:35
int normalized_cpu_threads() const
std::vector< uint64_t > DeleteVictimOffsetList
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
+ Here is the caller graph for this function:

◆ yieldUpdateCallback()

template<typename EXECUTOR_TRAITS , typename FRAGMENT_UPDATER >
StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::UpdateCallback StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::yieldUpdateCallback ( UpdateTransactionParameters update_parameters)

Definition at line 126 of file StorageIOFacility.h.

Referenced by StorageIOFacility< RelAlgExecutorTraits >::StorageIOFacility().

127  {
128  using OffsetVector = std::vector<uint64_t>;
129  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
130  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
131 
132  if (update_parameters.isVarlenUpdateRequired()) {
133  auto callback = [this,
134  &update_parameters](FragmentUpdaterType const& update_log) -> void {
135  std::vector<const ColumnDescriptor*> columnDescriptors;
136  std::vector<TargetMetaInfo> sourceMetaInfos;
137 
138  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size(); idx++) {
139  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
140  auto target_column =
141  catalog_.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
142  columnDescriptors.push_back(target_column);
143  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
144  }
145 
146  auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
147  auto* fragmenter = td->fragmenter.get();
148  CHECK(fragmenter);
149 
150  fragmenter->updateColumns(
151  &catalog_,
152  td,
153  update_log.getFragmentId(),
154  sourceMetaInfos,
155  columnDescriptors,
156  update_log,
157  update_parameters.getUpdateColumnCount(), // last column of result set
159  update_parameters.getTransactionTracker(),
160  executor_);
161  };
162  return callback;
163  } else if (update_parameters.tableIsTemporary()) {
164  auto callback = [this,
165  &update_parameters](FragmentUpdaterType const& update_log) -> void {
166  auto rs = update_log.getResultSet();
167  CHECK(rs->didOutputColumnar());
168  CHECK(rs->isDirectColumnarConversionPossible());
169  CHECK_EQ(update_parameters.getUpdateColumnCount(), size_t(1));
170  CHECK_EQ(rs->colCount(), size_t(1));
171 
172  // Temporary table updates require the full projected column
173  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
174 
175  auto& fragment_info = update_log.getFragmentInfo();
176  const auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
177  CHECK(td);
178  const auto cd = catalog_.getMetadataForColumn(
179  td->tableId, update_parameters.getUpdateColumnNames().front());
180  CHECK(cd);
181  auto chunk_metadata =
182  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
183  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
184  ChunkKey chunk_key{catalog_.getCurrentDB().dbId,
185  td->tableId,
186  cd->columnId,
187  fragment_info.fragmentId};
188  auto chunk = Chunk_NS::Chunk::getChunk(cd,
189  &catalog_.getDataMgr(),
190  chunk_key,
192  0,
193  chunk_metadata->second->numBytes,
194  chunk_metadata->second->numElements);
195  CHECK(chunk);
196  auto chunk_buffer = chunk->getBuffer();
197  CHECK(chunk_buffer && chunk_buffer->has_encoder);
198 
199  auto encoder = chunk_buffer->encoder.get();
200  CHECK(encoder);
201 
202  auto owned_buffer =
204  rs.get(), 0, cd->columnType, rs->rowCount());
205  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
206  const auto new_chunk_metadata =
207  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
208  CHECK(new_chunk_metadata);
209 
210  auto fragmenter = td->fragmenter.get();
211  CHECK(fragmenter);
212 
213  // The fragmenter copy of the fragment info differs from the copy used by the query
214  // engine. Update metadata in the fragmenter directly.
215  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
216  // TODO: we may want to put this directly in the fragmenter so we are under the
217  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
218  // allowed in this path.
219 
220  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
221  fragment->shadowChunkMetadataMap =
222  fragment->getChunkMetadataMap(); // TODO(adb): needed?
223 
224  auto& data_mgr = catalog_.getDataMgr();
225  if (data_mgr.gpusPresent()) {
226  // flush any GPU copies of the updated chunk
227  data_mgr.deleteChunksWithPrefix(chunk_key,
229  }
230  };
231  return callback;
232  } else {
233  auto callback = [this,
234  &update_parameters](FragmentUpdaterType const& update_log) -> void {
235  auto entries_per_column = update_log.getEntryCount();
236  auto rows_per_column = update_log.getRowCount();
237  if (rows_per_column == 0) {
238  return;
239  }
240 
241  OffsetVector column_offsets(rows_per_column);
242  ScalarTargetValueVector scalar_target_values(rows_per_column);
243 
244  auto complete_entry_block_size = entries_per_column / normalized_cpu_threads();
245  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
246  auto usable_threads = normalized_cpu_threads();
247  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
248  complete_entry_block_size = entries_per_column;
249  partial_row_block_size = 0;
250  usable_threads = 1;
251  }
252 
253  std::atomic<size_t> row_idx{0};
254 
255  auto process_rows =
256  [&update_parameters, &column_offsets, &scalar_target_values, &row_idx](
257  auto get_entry_at_func,
258  uint64_t column_index,
259  uint64_t entry_start,
260  uint64_t entry_count) -> uint64_t {
261  uint64_t entries_processed = 0;
262  for (uint64_t entry_index = entry_start;
263  entry_index < (entry_start + entry_count);
264  entry_index++) {
265  const auto& row = get_entry_at_func(entry_index);
266  if (row.empty()) {
267  continue;
268  }
269 
270  entries_processed++;
271  size_t row_index = row_idx.fetch_add(1);
272 
273  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
274 
275  auto terminal_column_iter = std::prev(row.end());
276  const auto frag_offset_scalar_tv =
277  boost::get<ScalarTargetValue>(&*terminal_column_iter);
278  CHECK(frag_offset_scalar_tv);
279 
280  column_offsets[row_index] =
281  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
282  scalar_target_values[row_index] =
283  boost::get<ScalarTargetValue>(row[column_index]);
284  }
285  return entries_processed;
286  };
287 
288  auto get_row_index =
289  [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
290  return (thread_index * complete_entry_block_size);
291  };
292 
293  // Iterate over each column
294  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
295  column_index < update_parameters.getUpdateColumnCount();
296  column_index++) {
297  row_idx = 0;
298  RowProcessingFuturesVector entry_processing_futures;
299  entry_processing_futures.reserve(usable_threads);
300 
301  auto get_entry_at_func = [&update_log, &column_index](const size_t entry_index) {
302  if (UNLIKELY(update_log.getColumnType(column_index).is_string())) {
303  return update_log.getTranslatedEntryAt(entry_index);
304  } else {
305  return update_log.getEntryAt(entry_index);
306  }
307  };
308 
309  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
310  entry_processing_futures.emplace_back(
311  std::async(std::launch::async,
312  std::forward<decltype(process_rows)>(process_rows),
313  get_entry_at_func,
314  column_index,
315  get_row_index(i),
316  complete_entry_block_size));
317  }
318  if (partial_row_block_size) {
319  entry_processing_futures.emplace_back(
320  std::async(std::launch::async,
321  std::forward<decltype(process_rows)>(process_rows),
322  get_entry_at_func,
323  column_index,
324  get_row_index(usable_threads),
325  partial_row_block_size));
326  }
327 
328  uint64_t entries_processed(0);
329  for (auto& t : entry_processing_futures) {
330  t.wait();
331  entries_processed += t.get();
332  }
333 
334  CHECK(row_idx == rows_per_column);
335 
336  const auto table_id = update_log.getPhysicalTableId();
337  auto const* table_descriptor =
338  catalog_.getMetadataForTable(update_log.getPhysicalTableId());
339  CHECK(table_descriptor);
340  const auto fragmenter = table_descriptor->fragmenter;
341  CHECK(fragmenter);
342  auto const* target_column = catalog_.getMetadataForColumn(
343  table_id, update_parameters.getUpdateColumnNames()[column_index]);
344 
345  fragmenter->updateColumn(&catalog_,
346  table_descriptor,
347  target_column,
348  update_log.getFragmentId(),
349  column_offsets,
350  scalar_target_values,
351  update_log.getColumnType(column_index),
353  update_parameters.getTransactionTracker());
354  }
355  };
356  return callback;
357  }
358 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
CatalogType const & catalog_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
ExecutorType * executor_
#define UNLIKELY(x)
Definition: likely.h:20
FRAGMENT_UPDATER FragmentUpdaterType
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:35
int normalized_cpu_threads() const
+ Here is the caller graph for this function:

Member Data Documentation

◆ catalog_

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
CatalogType const& StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::catalog_
private

◆ executor_

template<typename EXECUTOR_TRAITS, typename FRAGMENT_UPDATER = UpdateLogForFragment>
ExecutorType* StorageIOFacility< EXECUTOR_TRAITS, FRAGMENT_UPDATER >::executor_
private

The documentation for this class was generated from the following file: