OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StorageIOFacility Class Reference

#include <StorageIOFacility.h>

+ Inheritance diagram for StorageIOFacility:
+ Collaboration diagram for StorageIOFacility:

Classes

struct  DeleteTransactionParameters
 
class  TransactionParameters
 
class  UpdateTransactionParameters
 

Public Types

using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Public Member Functions

 StorageIOFacility (Executor *executor)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Private Member Functions

int normalized_cpu_threads () const
 

Static Private Member Functions

static std::unique_ptr< int8_t[]> getRsBufferNoPadding (const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
 

Private Attributes

Executorexecutor_
 

Detailed Description

Definition at line 106 of file StorageIOFacility.h.

Member Typedef Documentation

using StorageIOFacility::ColumnValidationFunction = std::function<bool(std::string const&)>

Definition at line 119 of file StorageIOFacility.h.

using StorageIOFacility::DeleteVictimOffsetList = std::vector<uint64_t>

Definition at line 111 of file StorageIOFacility.h.

Definition at line 118 of file StorageIOFacility.h.

using StorageIOFacility::UpdateTargetColumnNamesList = std::vector<std::string>

Definition at line 114 of file StorageIOFacility.h.

using StorageIOFacility::UpdateTargetOffsetList = std::vector<uint64_t>

Definition at line 112 of file StorageIOFacility.h.

Definition at line 113 of file StorageIOFacility.h.

Constructor & Destructor Documentation

StorageIOFacility::StorageIOFacility ( Executor executor)
inline

Definition at line 204 of file StorageIOFacility.h.

204 : executor_(executor) {}

Member Function Documentation

static std::unique_ptr<int8_t[]> StorageIOFacility::getRsBufferNoPadding ( const ResultSet rs,
size_t  col_idx,
const SQLTypeInfo column_type,
size_t  row_count 
)
inlinestaticprivate

Definition at line 651 of file StorageIOFacility.h.

References CHECK, SQLTypeInfo::get_logical_size(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_fp(), and kFLOAT.

Referenced by yieldDeleteCallback(), and yieldUpdateCallback().

654  {
655  const auto padded_size = rs->getPaddedSlotWidthBytes(col_idx);
656  const auto type_size = column_type.is_dict_encoded_string()
657  ? column_type.get_size()
658  : column_type.get_logical_size();
659 
660  auto rs_buffer_size = padded_size * row_count;
661  auto rs_buffer = std::make_unique<int8_t[]>(rs_buffer_size);
662  rs->copyColumnIntoBuffer(col_idx, rs_buffer.get(), rs_buffer_size);
663 
664  if (type_size < padded_size) {
665  // else we're going to remove padding and we do it inplace in the same buffer
666  // we can do updates inplace in the same buffer because type_size < padded_size
667  // for some types, like kFLOAT, simple memcpy is not enough
668  auto src_ptr = rs_buffer.get();
669  auto dst_ptr = rs_buffer.get();
670  if (column_type.is_fp()) {
671  CHECK(column_type.get_type() == kFLOAT);
672  CHECK(padded_size == sizeof(double));
673  for (size_t i = 0; i < row_count; i++) {
674  const auto old_val = *reinterpret_cast<double*>(may_alias_ptr(src_ptr));
675  auto new_val = static_cast<float>(old_val);
676  std::memcpy(dst_ptr, &new_val, type_size);
677  dst_ptr += type_size;
678  src_ptr += padded_size;
679  }
680  } else {
681  // otherwise just take first type_size bytes from the padded value
682  for (size_t i = 0; i < row_count; i++) {
683  std::memcpy(dst_ptr, src_ptr, type_size);
684  dst_ptr += type_size;
685  src_ptr += padded_size;
686  }
687  }
688  }
689  return rs_buffer;
690  }
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
bool is_fp() const
Definition: sqltypes.h:571
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
int get_logical_size() const
Definition: sqltypes.h:419
#define CHECK(condition)
Definition: Logger.h:291
bool is_dict_encoded_string() const
Definition: sqltypes.h:641

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int StorageIOFacility::normalized_cpu_threads ( ) const
inlineprivate

Definition at line 649 of file StorageIOFacility.h.

References cpu_threads().

Referenced by yieldDeleteCallback(), and yieldUpdateCallback().

649 { return cpu_threads() / 2; }
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StorageIOFacility::UpdateCallback StorageIOFacility::yieldDeleteCallback ( DeleteTransactionParameters delete_parameters)
inline

Definition at line 468 of file StorageIOFacility.h.

References threading_serial::async(), CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, StorageIOFacility::TransactionParameters::getCatalog(), Chunk_NS::Chunk::getChunk(), UpdateLogForFragment::getColumnType(), UpdateLogForFragment::getEntryAt(), UpdateLogForFragment::getEntryCount(), UpdateLogForFragment::getFragmentId(), Catalog_Namespace::Catalog::getMetadataForTable(), UpdateLogForFragment::getPhysicalTableId(), UpdateLogForFragment::getRowCount(), getRsBufferNoPadding(), StorageIOFacility::TransactionParameters::getTableDescriptor(), StorageIOFacility::TransactionParameters::getTransactionTracker(), lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable(), Data_Namespace::GPU_LEVEL, kBOOLEAN, normalized_cpu_threads(), table_is_temporary(), StorageIOFacility::TransactionParameters::tableIsTemporary(), and UNLIKELY.

Referenced by RelAlgExecutor::executeDelete().

469  {
470  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
471 
472  if (delete_parameters.tableIsTemporary()) {
473  auto logical_table_id = delete_parameters.getTableDescriptor()->tableId;
474  const auto& catalog = delete_parameters.getCatalog();
475  auto callback = [logical_table_id, &catalog](UpdateLogForFragment const& update_log,
476  TableUpdateMetadata&) -> void {
477  auto rs = update_log.getResultSet();
478  CHECK(rs->didOutputColumnar());
479  CHECK(rs->isDirectColumnarConversionPossible());
480  CHECK_EQ(rs->colCount(), size_t(1));
481 
482  // Temporary table updates require the full projected column
483  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
484 
485  const ChunkKey lock_chunk_key{catalog.getCurrentDB().dbId, logical_table_id};
486  const auto table_lock =
488 
489  auto& fragment_info = update_log.getFragmentInfo();
490  const auto td = catalog.getMetadataForTable(update_log.getPhysicalTableId());
491  CHECK(td);
492  const auto cd = catalog.getDeletedColumn(td);
493  CHECK(cd);
494  CHECK(cd->columnType.get_type() == kBOOLEAN);
495  auto chunk_metadata =
496  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
497  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
498  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
499  td->tableId,
500  cd->columnId,
501  fragment_info.fragmentId};
502  auto chunk = Chunk_NS::Chunk::getChunk(cd,
503  &catalog.getDataMgr(),
504  chunk_key,
506  0,
507  chunk_metadata->second->numBytes,
508  chunk_metadata->second->numElements);
509  CHECK(chunk);
510  auto chunk_buffer = chunk->getBuffer();
511  CHECK(chunk_buffer);
512 
513  auto encoder = chunk_buffer->getEncoder();
514  CHECK(encoder);
515 
516  auto owned_buffer = StorageIOFacility::getRsBufferNoPadding(
517  rs.get(), 0, cd->columnType, rs->rowCount());
518  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
519 
520  const auto new_chunk_metadata =
521  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
522 
523  auto fragmenter = td->fragmenter.get();
524  CHECK(fragmenter);
525 
526  // The fragmenter copy of the fragment info differs from the copy used by the
527  // query engine. Update metadata in the fragmenter directly.
528  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
529  // TODO: we may want to put this directly in the fragmenter so we are under the
530  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
531  // allowed in this path.
532 
533  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
534  fragment->shadowChunkMetadataMap =
535  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
536 
537  auto& data_mgr = catalog.getDataMgr();
538  if (data_mgr.gpusPresent()) {
539  // flush any GPU copies of the updated chunk
540  data_mgr.deleteChunksWithPrefix(chunk_key,
542  }
543  };
544  return callback;
545  } else {
546  auto callback = [this, &delete_parameters](
547  UpdateLogForFragment const& update_log,
548  TableUpdateMetadata& table_update_metadata) -> void {
549  auto entries_per_column = update_log.getEntryCount();
550  auto rows_per_column = update_log.getRowCount();
551  if (rows_per_column == 0) {
552  return;
553  }
554  DeleteVictimOffsetList victim_offsets(rows_per_column);
555 
556  auto complete_row_block_size = entries_per_column / normalized_cpu_threads();
557  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
558  auto usable_threads = normalized_cpu_threads();
559 
560  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
561  complete_row_block_size = rows_per_column;
562  partial_row_block_size = 0;
563  usable_threads = 1;
564  }
565 
566  std::atomic<size_t> row_idx{0};
567 
568  auto process_rows = [&update_log, &victim_offsets, &row_idx](
569  uint64_t entry_start, uint64_t entry_count) -> uint64_t {
570  uint64_t entries_processed = 0;
571 
572  for (uint64_t entry_index = entry_start;
573  entry_index < (entry_start + entry_count);
574  entry_index++) {
575  auto const row(update_log.getEntryAt(entry_index));
576 
577  if (row.empty()) {
578  continue;
579  }
580 
581  entries_processed++;
582  size_t row_index = row_idx.fetch_add(1);
583 
584  auto terminal_column_iter = std::prev(row.end());
585  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
586  CHECK(scalar_tv);
587 
588  uint64_t fragment_offset =
589  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
590  victim_offsets[row_index] = fragment_offset;
591  }
592  return entries_processed;
593  };
594 
595  auto get_row_index =
596  [complete_row_block_size](uint64_t thread_index) -> uint64_t {
597  return thread_index * complete_row_block_size;
598  };
599 
600  RowProcessingFuturesVector row_processing_futures;
601  row_processing_futures.reserve(usable_threads);
602 
603  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
604  row_processing_futures.emplace_back(
606  std::forward<decltype(process_rows)>(process_rows),
607  get_row_index(i),
608  complete_row_block_size));
609  }
610  if (partial_row_block_size) {
611  row_processing_futures.emplace_back(
613  std::forward<decltype(process_rows)>(process_rows),
614  get_row_index(usable_threads),
615  partial_row_block_size));
616  }
617 
618  for (auto& t : row_processing_futures) {
619  t.wait();
620  }
621 
622  const auto& catalog = delete_parameters.getCatalog();
623  auto const* table_descriptor =
624  catalog.getMetadataForTable(update_log.getPhysicalTableId());
625  CHECK(table_descriptor);
626  CHECK(!table_is_temporary(table_descriptor));
627  auto* fragmenter = table_descriptor->fragmenter.get();
628  CHECK(fragmenter);
629 
630  auto const* deleted_column_desc = catalog.getDeletedColumn(table_descriptor);
631  CHECK(deleted_column_desc);
632  fragmenter->updateColumn(&catalog,
633  table_descriptor,
634  deleted_column_desc,
635  update_log.getFragmentId(),
636  victim_offsets,
637  ScalarTargetValue(int64_t(1L)),
638  update_log.getColumnType(0),
640  delete_parameters.getTransactionTracker());
641  table_update_metadata.fragments_with_deleted_rows[table_descriptor->tableId]
642  .emplace(update_log.getFragmentId());
643  };
644  return callback;
645  }
646  }
SQLTypeInfo getColumnType(const size_t col_idx) const
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
int normalized_cpu_threads() const
future< Result > async(Fn &&fn, Args &&...args)
std::vector< TargetValue > getEntryAt(const size_t index) const override
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
Definition: Execute.h:364
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
Definition: Execute.h:361
#define UNLIKELY(x)
Definition: likely.h:25
bool table_is_temporary(const TableDescriptor *const td)
#define CHECK(condition)
Definition: Logger.h:291
std::vector< uint64_t > DeleteVictimOffsetList
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
size_t const getRowCount() const override
size_t const getEntryCount() const override
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StorageIOFacility::UpdateCallback StorageIOFacility::yieldUpdateCallback ( UpdateTransactionParameters update_parameters)
inline

Definition at line 206 of file StorageIOFacility.h.

References threading_serial::async(), CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, Catalog_Namespace::DBMetadata::dbId, executor_, StorageIOFacility::TransactionParameters::getCatalog(), Chunk_NS::Chunk::getChunk(), UpdateLogForFragment::getColumnType(), Catalog_Namespace::Catalog::getCurrentDB(), UpdateLogForFragment::getEntryAt(), UpdateLogForFragment::getEntryCount(), UpdateLogForFragment::getFragmentId(), UpdateLogForFragment::getFragmentInfo(), StorageIOFacility::TransactionParameters::getInputSourceNode(), Catalog_Namespace::Catalog::getMetadataForTable(), UpdateLogForFragment::getPhysicalTableId(), UpdateLogForFragment::getResultSet(), UpdateLogForFragment::getRowCount(), getRsBufferNoPadding(), StorageIOFacility::TransactionParameters::getTableDescriptor(), StorageIOFacility::UpdateTransactionParameters::getTargetsMetaInfo(), StorageIOFacility::TransactionParameters::getTransactionTracker(), UpdateLogForFragment::getTranslatedEntryAt(), StorageIOFacility::UpdateTransactionParameters::getUpdateColumnCount(), StorageIOFacility::UpdateTransactionParameters::getUpdateColumnNames(), lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable(), Data_Namespace::GPU_LEVEL, SQLTypeInfo::is_string(), StorageIOFacility::UpdateTransactionParameters::isVarlenUpdateRequired(), normalized_cpu_threads(), anonymous_namespace{StorageIOFacility.h}::should_recompute_metadata(), StorageIOFacility::TransactionParameters::tableIsTemporary(), UNLIKELY, and foreign_storage::update_stats().

Referenced by RelAlgExecutor::executeUpdate().

207  {
208  using OffsetVector = std::vector<uint64_t>;
209  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
210  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
211 
212  if (update_parameters.isVarlenUpdateRequired()) {
213  auto callback = [this, &update_parameters](
214  UpdateLogForFragment const& update_log,
215  TableUpdateMetadata& table_update_metadata) -> void {
216  std::vector<const ColumnDescriptor*> columnDescriptors;
217  std::vector<TargetMetaInfo> sourceMetaInfos;
218 
219  const auto& catalog = update_parameters.getCatalog();
220  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size();
221  idx++) {
222  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
223  auto target_column =
224  catalog.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
225  columnDescriptors.push_back(target_column);
226  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
227  }
228 
229  auto td = catalog.getMetadataForTable(update_log.getPhysicalTableId());
230  auto* fragmenter = td->fragmenter.get();
231  CHECK(fragmenter);
232 
233  fragmenter->updateColumns(
234  &catalog,
235  td,
236  update_log.getFragmentId(),
237  sourceMetaInfos,
238  columnDescriptors,
239  update_log,
240  update_parameters.getUpdateColumnCount(), // last column of result set
242  update_parameters.getTransactionTracker(),
243  executor_);
244  table_update_metadata.fragments_with_deleted_rows[td->tableId].emplace(
245  update_log.getFragmentId());
246  };
247  return callback;
248  } else if (update_parameters.tableIsTemporary()) {
249  auto callback = [&update_parameters](UpdateLogForFragment const& update_log,
250  TableUpdateMetadata&) -> void {
251  auto rs = update_log.getResultSet();
252  CHECK(rs->didOutputColumnar());
253  CHECK(rs->isDirectColumnarConversionPossible());
254  CHECK_EQ(update_parameters.getUpdateColumnCount(), size_t(1));
255  CHECK_EQ(rs->colCount(), size_t(1));
256 
257  // Temporary table updates require the full projected column
258  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
259 
260  const auto& catalog = update_parameters.getCatalog();
261  ChunkKey chunk_key_prefix{catalog.getCurrentDB().dbId,
262  update_parameters.getTableDescriptor()->tableId};
263  const auto table_lock =
265 
266  auto& fragment_info = update_log.getFragmentInfo();
267  const auto td = catalog.getMetadataForTable(update_log.getPhysicalTableId());
268  CHECK(td);
269  const auto cd = catalog.getMetadataForColumn(
270  td->tableId, update_parameters.getUpdateColumnNames().front());
271  CHECK(cd);
272  auto chunk_metadata =
273  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
274  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
275  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
276  td->tableId,
277  cd->columnId,
278  fragment_info.fragmentId};
279  auto chunk = Chunk_NS::Chunk::getChunk(cd,
280  &catalog.getDataMgr(),
281  chunk_key,
283  0,
284  chunk_metadata->second->numBytes,
285  chunk_metadata->second->numElements);
286  CHECK(chunk);
287  auto chunk_buffer = chunk->getBuffer();
288  CHECK(chunk_buffer);
289 
290  auto encoder = chunk_buffer->getEncoder();
291  CHECK(encoder);
292 
293  auto owned_buffer = StorageIOFacility::getRsBufferNoPadding(
294  rs.get(), 0, cd->columnType, rs->rowCount());
295  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
296 
297  const auto new_chunk_metadata =
298  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
299  CHECK(new_chunk_metadata);
300 
301  auto fragmenter = td->fragmenter.get();
302  CHECK(fragmenter);
303 
304  // The fragmenter copy of the fragment info differs from the copy used by the
305  // query engine. Update metadata in the fragmenter directly.
306  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
307  // TODO: we may want to put this directly in the fragmenter so we are under the
308  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
309  // allowed in this path.
310 
311  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
312  fragment->shadowChunkMetadataMap =
313  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
314 
315  auto& data_mgr = catalog.getDataMgr();
316  if (data_mgr.gpusPresent()) {
317  // flush any GPU copies of the updated chunk
318  data_mgr.deleteChunksWithPrefix(chunk_key,
320  }
321  };
322  return callback;
323  } else {
324  auto callback = [this, &update_parameters](
325  UpdateLogForFragment const& update_log,
326  TableUpdateMetadata& table_update_metadata) -> void {
327  auto entries_per_column = update_log.getEntryCount();
328  auto rows_per_column = update_log.getRowCount();
329  if (rows_per_column == 0) {
330  return;
331  }
332 
333  OffsetVector column_offsets(rows_per_column);
334  ScalarTargetValueVector scalar_target_values(rows_per_column);
335 
336  auto complete_entry_block_size = entries_per_column / normalized_cpu_threads();
337  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
338  auto usable_threads = normalized_cpu_threads();
339  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
340  complete_entry_block_size = entries_per_column;
341  partial_row_block_size = 0;
342  usable_threads = 1;
343  }
344 
345  std::atomic<size_t> row_idx{0};
346 
347  auto process_rows =
348  [&update_parameters, &column_offsets, &scalar_target_values, &row_idx](
349  auto get_entry_at_func,
350  uint64_t column_index,
351  uint64_t entry_start,
352  uint64_t entry_count) -> uint64_t {
353  uint64_t entries_processed = 0;
354  for (uint64_t entry_index = entry_start;
355  entry_index < (entry_start + entry_count);
356  entry_index++) {
357  const auto& row = get_entry_at_func(entry_index);
358  if (row.empty()) {
359  continue;
360  }
361 
362  entries_processed++;
363  size_t row_index = row_idx.fetch_add(1);
364 
365  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
366 
367  auto terminal_column_iter = std::prev(row.end());
368  const auto frag_offset_scalar_tv =
369  boost::get<ScalarTargetValue>(&*terminal_column_iter);
370  CHECK(frag_offset_scalar_tv);
371 
372  column_offsets[row_index] =
373  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
374  scalar_target_values[row_index] =
375  boost::get<ScalarTargetValue>(row[column_index]);
376  }
377  return entries_processed;
378  };
379 
380  auto get_row_index =
381  [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
382  return (thread_index * complete_entry_block_size);
383  };
384 
385  const auto& catalog = update_parameters.getCatalog();
386  auto const* table_descriptor =
387  catalog.getMetadataForTable(update_log.getPhysicalTableId());
388  auto fragment_id = update_log.getFragmentId();
389  auto table_id = update_log.getPhysicalTableId();
390  if (!table_descriptor) {
391  const auto* input_source_node = update_parameters.getInputSourceNode();
392  if (auto proj_node = dynamic_cast<const RelProject*>(input_source_node)) {
393  if (proj_node->hasPushedDownWindowExpr() ||
394  proj_node->hasWindowFunctionExpr()) {
395  table_id = proj_node->getModifiedTableDescriptor()->tableId;
396  table_descriptor = catalog.getMetadataForTable(table_id);
397  }
398  }
399  }
400  CHECK(table_descriptor);
401 
402  // Iterate over each column
403  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
404  column_index < update_parameters.getUpdateColumnCount();
405  column_index++) {
406  row_idx = 0;
407  RowProcessingFuturesVector entry_processing_futures;
408  entry_processing_futures.reserve(usable_threads);
409 
410  auto get_entry_at_func = [&update_log,
411  &column_index](const size_t entry_index) {
412  if (UNLIKELY(update_log.getColumnType(column_index).is_string())) {
413  return update_log.getTranslatedEntryAt(entry_index);
414  } else {
415  return update_log.getEntryAt(entry_index);
416  }
417  };
418 
419  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
420  entry_processing_futures.emplace_back(
422  std::forward<decltype(process_rows)>(process_rows),
423  get_entry_at_func,
424  column_index,
425  get_row_index(i),
426  complete_entry_block_size));
427  }
428  if (partial_row_block_size) {
429  entry_processing_futures.emplace_back(
431  std::forward<decltype(process_rows)>(process_rows),
432  get_entry_at_func,
433  column_index,
434  get_row_index(usable_threads),
435  partial_row_block_size));
436  }
437 
438  for (auto& t : entry_processing_futures) {
439  t.wait();
440  }
441 
442  CHECK(row_idx == rows_per_column);
443  const auto fragmenter = table_descriptor->fragmenter;
444  CHECK(fragmenter);
445  auto const* target_column = catalog.getMetadataForColumn(
446  table_id, update_parameters.getUpdateColumnNames()[column_index]);
447  CHECK(target_column);
448  auto update_stats =
449  fragmenter->updateColumn(&catalog,
450  table_descriptor,
451  target_column,
452  fragment_id,
453  column_offsets,
454  scalar_target_values,
455  update_log.getColumnType(column_index),
457  update_parameters.getTransactionTracker());
459  table_update_metadata.columns_for_metadata_update[target_column].emplace(
460  fragment_id);
461  }
462  }
463  };
464  return callback;
465  }
466  }
SQLTypeInfo getColumnType(const size_t col_idx) const
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
bool should_recompute_metadata(const std::optional< Fragmenter_Namespace::ChunkUpdateStats > &update_stats)
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
int normalized_cpu_threads() const
future< Result > async(Fn &&fn, Args &&...args)
std::vector< TargetValue > getEntryAt(const size_t index) const override
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
Definition: Execute.h:364
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
Definition: Execute.h:361
FragmentInfoType const & getFragmentInfo() const
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
#define UNLIKELY(x)
Definition: likely.h:25
#define CHECK(condition)
Definition: Logger.h:291
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:559
auto getResultSet() const
Definition: Execute.h:372
size_t const getRowCount() const override
size_t const getEntryCount() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Executor* StorageIOFacility::executor_
private

Definition at line 692 of file StorageIOFacility.h.

Referenced by yieldUpdateCallback().


The documentation for this class was generated from the following file: