OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StorageIOFacility Class Reference

#include <StorageIOFacility.h>

+ Inheritance diagram for StorageIOFacility:
+ Collaboration diagram for StorageIOFacility:

Classes

struct  DeleteTransactionParameters
 
class  TransactionParameters
 
class  UpdateTransactionParameters
 

Public Types

using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Public Member Functions

 StorageIOFacility (Executor *executor, Catalog_Namespace::Catalog const &catalog)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Private Member Functions

int normalized_cpu_threads () const
 

Static Private Member Functions

static std::unique_ptr< int8_t[]> getRsBufferNoPadding (const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
 

Private Attributes

Executorexecutor_
 
Catalog_Namespace::Catalog const & catalog_
 

Detailed Description

Definition at line 106 of file StorageIOFacility.h.

Member Typedef Documentation

using StorageIOFacility::ColumnValidationFunction = std::function<bool(std::string const&)>

Definition at line 119 of file StorageIOFacility.h.

using StorageIOFacility::DeleteVictimOffsetList = std::vector<uint64_t>

Definition at line 111 of file StorageIOFacility.h.

Definition at line 118 of file StorageIOFacility.h.

using StorageIOFacility::UpdateTargetColumnNamesList = std::vector<std::string>

Definition at line 114 of file StorageIOFacility.h.

using StorageIOFacility::UpdateTargetOffsetList = std::vector<uint64_t>

Definition at line 112 of file StorageIOFacility.h.

Definition at line 113 of file StorageIOFacility.h.

Constructor & Destructor Documentation

StorageIOFacility::StorageIOFacility ( Executor executor,
Catalog_Namespace::Catalog const &  catalog 
)
inline

Definition at line 197 of file StorageIOFacility.h.

198  : executor_(executor), catalog_(catalog) {}
Catalog_Namespace::Catalog const & catalog_

Member Function Documentation

static std::unique_ptr<int8_t[]> StorageIOFacility::getRsBufferNoPadding ( const ResultSet rs,
size_t  col_idx,
const SQLTypeInfo column_type,
size_t  row_count 
)
inlinestaticprivate

Definition at line 642 of file StorageIOFacility.h.

References CHECK, SQLTypeInfo::get_logical_size(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_fp(), and kFLOAT.

Referenced by yieldDeleteCallback(), and yieldUpdateCallback().

645  {
646  const auto padded_size = rs->getPaddedSlotWidthBytes(col_idx);
647  const auto type_size = column_type.is_dict_encoded_string()
648  ? column_type.get_size()
649  : column_type.get_logical_size();
650 
651  auto rs_buffer_size = padded_size * row_count;
652  auto rs_buffer = std::make_unique<int8_t[]>(rs_buffer_size);
653  rs->copyColumnIntoBuffer(col_idx, rs_buffer.get(), rs_buffer_size);
654 
655  if (type_size < padded_size) {
656  // else we're going to remove padding and we do it inplace in the same buffer
657  // we can do updates inplace in the same buffer because type_size < padded_size
658  // for some types, like kFLOAT, simple memcpy is not enough
659  auto src_ptr = rs_buffer.get();
660  auto dst_ptr = rs_buffer.get();
661  if (column_type.is_fp()) {
662  CHECK(column_type.get_type() == kFLOAT);
663  CHECK(padded_size == sizeof(double));
664  for (size_t i = 0; i < row_count; i++) {
665  const auto old_val = *reinterpret_cast<double*>(may_alias_ptr(src_ptr));
666  auto new_val = static_cast<float>(old_val);
667  std::memcpy(dst_ptr, &new_val, type_size);
668  dst_ptr += type_size;
669  src_ptr += padded_size;
670  }
671  } else {
672  // otherwise just take first type_size bytes from the padded value
673  for (size_t i = 0; i < row_count; i++) {
674  std::memcpy(dst_ptr, src_ptr, type_size);
675  dst_ptr += type_size;
676  src_ptr += padded_size;
677  }
678  }
679  }
680  return rs_buffer;
681  }
HOST DEVICE int get_size() const
Definition: sqltypes.h:414
bool is_fp() const
Definition: sqltypes.h:604
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:404
int get_logical_size() const
Definition: sqltypes.h:424
#define CHECK(condition)
Definition: Logger.h:222
bool is_dict_encoded_string() const
Definition: sqltypes.h:652

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int StorageIOFacility::normalized_cpu_threads ( ) const
inlineprivate

Definition at line 640 of file StorageIOFacility.h.

References cpu_threads().

Referenced by yieldDeleteCallback(), and yieldUpdateCallback().

640 { return cpu_threads() / 2; }
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StorageIOFacility::UpdateCallback StorageIOFacility::yieldDeleteCallback ( DeleteTransactionParameters delete_parameters)
inline

Definition at line 460 of file StorageIOFacility.h.

References threading_serial::async(), catalog_, CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, Catalog_Namespace::DBMetadata::dbId, Data_Namespace::DataMgr::deleteChunksWithPrefix(), Chunk_NS::Chunk::getChunk(), UpdateLogForFragment::getColumnType(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getDeletedColumn(), UpdateLogForFragment::getEntryAt(), UpdateLogForFragment::getEntryCount(), UpdateLogForFragment::getFragmentId(), UpdateLogForFragment::getFragmentInfo(), Catalog_Namespace::Catalog::getMetadataForTable(), UpdateLogForFragment::getPhysicalTableId(), UpdateLogForFragment::getResultSet(), UpdateLogForFragment::getRowCount(), getRsBufferNoPadding(), StorageIOFacility::TransactionParameters::getTableDescriptor(), StorageIOFacility::TransactionParameters::getTransactionTracker(), lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable(), Data_Namespace::GPU_LEVEL, kBOOLEAN, normalized_cpu_threads(), table_is_temporary(), StorageIOFacility::TransactionParameters::tableIsTemporary(), and UNLIKELY.

Referenced by RelAlgExecutor::executeDelete().

461  {
462  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
463 
464  if (delete_parameters.tableIsTemporary()) {
465  auto logical_table_id = delete_parameters.getTableDescriptor()->tableId;
466  auto callback = [this, logical_table_id](UpdateLogForFragment const& update_log,
467  TableUpdateMetadata&) -> void {
468  auto rs = update_log.getResultSet();
469  CHECK(rs->didOutputColumnar());
470  CHECK(rs->isDirectColumnarConversionPossible());
471  CHECK_EQ(rs->colCount(), size_t(1));
472 
473  // Temporary table updates require the full projected column
474  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
475 
476  const ChunkKey lock_chunk_key{catalog_.getCurrentDB().dbId, logical_table_id};
477  const auto table_lock =
479 
480  auto& fragment_info = update_log.getFragmentInfo();
481  const auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
482  CHECK(td);
483  const auto cd = catalog_.getDeletedColumn(td);
484  CHECK(cd);
485  CHECK(cd->columnType.get_type() == kBOOLEAN);
486  auto chunk_metadata =
487  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
488  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
489  ChunkKey chunk_key{catalog_.getCurrentDB().dbId,
490  td->tableId,
491  cd->columnId,
492  fragment_info.fragmentId};
493  auto chunk = Chunk_NS::Chunk::getChunk(cd,
494  &catalog_.getDataMgr(),
495  chunk_key,
497  0,
498  chunk_metadata->second->numBytes,
499  chunk_metadata->second->numElements);
500  CHECK(chunk);
501  auto chunk_buffer = chunk->getBuffer();
502  CHECK(chunk_buffer);
503 
504  auto encoder = chunk_buffer->getEncoder();
505  CHECK(encoder);
506 
507  auto owned_buffer = StorageIOFacility::getRsBufferNoPadding(
508  rs.get(), 0, cd->columnType, rs->rowCount());
509  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
510 
511  const auto new_chunk_metadata =
512  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
513 
514  auto fragmenter = td->fragmenter.get();
515  CHECK(fragmenter);
516 
517  // The fragmenter copy of the fragment info differs from the copy used by the
518  // query engine. Update metadata in the fragmenter directly.
519  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
520  // TODO: we may want to put this directly in the fragmenter so we are under the
521  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
522  // allowed in this path.
523 
524  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
525  fragment->shadowChunkMetadataMap =
526  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
527 
528  auto& data_mgr = catalog_.getDataMgr();
529  if (data_mgr.gpusPresent()) {
530  // flush any GPU copies of the updated chunk
531  data_mgr.deleteChunksWithPrefix(chunk_key,
533  }
534  };
535  return callback;
536  } else {
537  auto callback = [this, &delete_parameters](
538  UpdateLogForFragment const& update_log,
539  TableUpdateMetadata& table_update_metadata) -> void {
540  auto entries_per_column = update_log.getEntryCount();
541  auto rows_per_column = update_log.getRowCount();
542  if (rows_per_column == 0) {
543  return;
544  }
545  DeleteVictimOffsetList victim_offsets(rows_per_column);
546 
547  auto complete_row_block_size = entries_per_column / normalized_cpu_threads();
548  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
549  auto usable_threads = normalized_cpu_threads();
550 
551  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
552  complete_row_block_size = rows_per_column;
553  partial_row_block_size = 0;
554  usable_threads = 1;
555  }
556 
557  std::atomic<size_t> row_idx{0};
558 
559  auto process_rows = [&update_log, &victim_offsets, &row_idx](
560  uint64_t entry_start, uint64_t entry_count) -> uint64_t {
561  uint64_t entries_processed = 0;
562 
563  for (uint64_t entry_index = entry_start;
564  entry_index < (entry_start + entry_count);
565  entry_index++) {
566  auto const row(update_log.getEntryAt(entry_index));
567 
568  if (row.empty()) {
569  continue;
570  }
571 
572  entries_processed++;
573  size_t row_index = row_idx.fetch_add(1);
574 
575  auto terminal_column_iter = std::prev(row.end());
576  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
577  CHECK(scalar_tv);
578 
579  uint64_t fragment_offset =
580  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
581  victim_offsets[row_index] = fragment_offset;
582  }
583  return entries_processed;
584  };
585 
586  auto get_row_index =
587  [complete_row_block_size](uint64_t thread_index) -> uint64_t {
588  return thread_index * complete_row_block_size;
589  };
590 
591  RowProcessingFuturesVector row_processing_futures;
592  row_processing_futures.reserve(usable_threads);
593 
594  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
595  row_processing_futures.emplace_back(
597  std::forward<decltype(process_rows)>(process_rows),
598  get_row_index(i),
599  complete_row_block_size));
600  }
601  if (partial_row_block_size) {
602  row_processing_futures.emplace_back(
604  std::forward<decltype(process_rows)>(process_rows),
605  get_row_index(usable_threads),
606  partial_row_block_size));
607  }
608 
609  uint64_t rows_processed(0);
610  for (auto& t : row_processing_futures) {
611  t.wait();
612  rows_processed += t.get();
613  }
614 
615  auto const* table_descriptor =
617  CHECK(!table_is_temporary(table_descriptor));
618  auto* fragmenter = table_descriptor->fragmenter.get();
619  CHECK(fragmenter);
620 
621  auto const* deleted_column_desc = catalog_.getDeletedColumn(table_descriptor);
622  CHECK(deleted_column_desc);
623  fragmenter->updateColumn(&catalog_,
624  table_descriptor,
625  deleted_column_desc,
626  update_log.getFragmentId(),
627  victim_offsets,
628  ScalarTargetValue(int64_t(1L)),
629  update_log.getColumnType(0),
631  delete_parameters.getTransactionTracker());
632  table_update_metadata.fragments_with_deleted_rows[table_descriptor->tableId]
633  .emplace(update_log.getFragmentId());
634  };
635  return callback;
636  }
637  }
SQLTypeInfo getColumnType(const size_t col_idx) const
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3661
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
int normalized_cpu_threads() const
future< Result > async(Fn &&fn, Args &&...args)
std::vector< TargetValue > getEntryAt(const size_t index) const override
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
Definition: Execute.h:346
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
Definition: Execute.h:343
FragmentInfoType const & getFragmentInfo() const
Catalog_Namespace::Catalog const & catalog_
#define UNLIKELY(x)
Definition: likely.h:25
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:492
bool table_is_temporary(const TableDescriptor *const td)
#define CHECK(condition)
Definition: Logger.h:222
std::vector< uint64_t > DeleteVictimOffsetList
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
auto getResultSet() const
Definition: Execute.h:354
size_t const getRowCount() const override
size_t const getEntryCount() const override
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StorageIOFacility::UpdateCallback StorageIOFacility::yieldUpdateCallback ( UpdateTransactionParameters update_parameters)
inline

Definition at line 200 of file StorageIOFacility.h.

References threading_serial::async(), catalog_, CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, Catalog_Namespace::DBMetadata::dbId, Data_Namespace::DataMgr::deleteChunksWithPrefix(), executor_, TableDescriptor::fragmenter, Chunk_NS::Chunk::getChunk(), UpdateLogForFragment::getColumnType(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), UpdateLogForFragment::getEntryAt(), UpdateLogForFragment::getEntryCount(), UpdateLogForFragment::getFragmentId(), UpdateLogForFragment::getFragmentInfo(), StorageIOFacility::TransactionParameters::getInputSourceNode(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForTable(), UpdateLogForFragment::getPhysicalTableId(), UpdateLogForFragment::getResultSet(), UpdateLogForFragment::getRowCount(), getRsBufferNoPadding(), StorageIOFacility::TransactionParameters::getTableDescriptor(), StorageIOFacility::UpdateTransactionParameters::getTargetsMetaInfo(), StorageIOFacility::TransactionParameters::getTransactionTracker(), UpdateLogForFragment::getTranslatedEntryAt(), StorageIOFacility::UpdateTransactionParameters::getUpdateColumnCount(), StorageIOFacility::UpdateTransactionParameters::getUpdateColumnNames(), lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable(), Data_Namespace::GPU_LEVEL, SQLTypeInfo::is_string(), StorageIOFacility::UpdateTransactionParameters::isVarlenUpdateRequired(), normalized_cpu_threads(), anonymous_namespace{StorageIOFacility.h}::should_recompute_metadata(), StorageIOFacility::TransactionParameters::tableIsTemporary(), UNLIKELY, and foreign_storage::update_stats().

Referenced by RelAlgExecutor::executeUpdate().

201  {
202  using OffsetVector = std::vector<uint64_t>;
203  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
204  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
205 
206  if (update_parameters.isVarlenUpdateRequired()) {
207  auto callback = [this, &update_parameters](
208  UpdateLogForFragment const& update_log,
209  TableUpdateMetadata& table_update_metadata) -> void {
210  std::vector<const ColumnDescriptor*> columnDescriptors;
211  std::vector<TargetMetaInfo> sourceMetaInfos;
212 
213  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size();
214  idx++) {
215  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
216  auto target_column =
217  catalog_.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
218  columnDescriptors.push_back(target_column);
219  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
220  }
221 
222  auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
223  auto* fragmenter = td->fragmenter.get();
224  CHECK(fragmenter);
225 
226  fragmenter->updateColumns(
227  &catalog_,
228  td,
229  update_log.getFragmentId(),
230  sourceMetaInfos,
231  columnDescriptors,
232  update_log,
233  update_parameters.getUpdateColumnCount(), // last column of result set
235  update_parameters.getTransactionTracker(),
236  executor_);
237  table_update_metadata.fragments_with_deleted_rows[td->tableId].emplace(
238  update_log.getFragmentId());
239  };
240  return callback;
241  } else if (update_parameters.tableIsTemporary()) {
242  auto callback = [this, &update_parameters](UpdateLogForFragment const& update_log,
243  TableUpdateMetadata&) -> void {
244  auto rs = update_log.getResultSet();
245  CHECK(rs->didOutputColumnar());
246  CHECK(rs->isDirectColumnarConversionPossible());
247  CHECK_EQ(update_parameters.getUpdateColumnCount(), size_t(1));
248  CHECK_EQ(rs->colCount(), size_t(1));
249 
250  // Temporary table updates require the full projected column
251  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
252 
253  ChunkKey chunk_key_prefix{catalog_.getCurrentDB().dbId,
254  update_parameters.getTableDescriptor()->tableId};
255  const auto table_lock =
257 
258  auto& fragment_info = update_log.getFragmentInfo();
259  const auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
260  CHECK(td);
261  const auto cd = catalog_.getMetadataForColumn(
262  td->tableId, update_parameters.getUpdateColumnNames().front());
263  CHECK(cd);
264  auto chunk_metadata =
265  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
266  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
267  ChunkKey chunk_key{catalog_.getCurrentDB().dbId,
268  td->tableId,
269  cd->columnId,
270  fragment_info.fragmentId};
271  auto chunk = Chunk_NS::Chunk::getChunk(cd,
272  &catalog_.getDataMgr(),
273  chunk_key,
275  0,
276  chunk_metadata->second->numBytes,
277  chunk_metadata->second->numElements);
278  CHECK(chunk);
279  auto chunk_buffer = chunk->getBuffer();
280  CHECK(chunk_buffer);
281 
282  auto encoder = chunk_buffer->getEncoder();
283  CHECK(encoder);
284 
285  auto owned_buffer = StorageIOFacility::getRsBufferNoPadding(
286  rs.get(), 0, cd->columnType, rs->rowCount());
287  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
288 
289  const auto new_chunk_metadata =
290  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
291  CHECK(new_chunk_metadata);
292 
293  auto fragmenter = td->fragmenter.get();
294  CHECK(fragmenter);
295 
296  // The fragmenter copy of the fragment info differs from the copy used by the
297  // query engine. Update metadata in the fragmenter directly.
298  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
299  // TODO: we may want to put this directly in the fragmenter so we are under the
300  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
301  // allowed in this path.
302 
303  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
304  fragment->shadowChunkMetadataMap =
305  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
306 
307  auto& data_mgr = catalog_.getDataMgr();
308  if (data_mgr.gpusPresent()) {
309  // flush any GPU copies of the updated chunk
310  data_mgr.deleteChunksWithPrefix(chunk_key,
312  }
313  };
314  return callback;
315  } else {
316  auto callback = [this, &update_parameters](
317  UpdateLogForFragment const& update_log,
318  TableUpdateMetadata& table_update_metadata) -> void {
319  auto entries_per_column = update_log.getEntryCount();
320  auto rows_per_column = update_log.getRowCount();
321  if (rows_per_column == 0) {
322  return;
323  }
324 
325  OffsetVector column_offsets(rows_per_column);
326  ScalarTargetValueVector scalar_target_values(rows_per_column);
327 
328  auto complete_entry_block_size = entries_per_column / normalized_cpu_threads();
329  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
330  auto usable_threads = normalized_cpu_threads();
331  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
332  complete_entry_block_size = entries_per_column;
333  partial_row_block_size = 0;
334  usable_threads = 1;
335  }
336 
337  std::atomic<size_t> row_idx{0};
338 
339  auto process_rows =
340  [&update_parameters, &column_offsets, &scalar_target_values, &row_idx](
341  auto get_entry_at_func,
342  uint64_t column_index,
343  uint64_t entry_start,
344  uint64_t entry_count) -> uint64_t {
345  uint64_t entries_processed = 0;
346  for (uint64_t entry_index = entry_start;
347  entry_index < (entry_start + entry_count);
348  entry_index++) {
349  const auto& row = get_entry_at_func(entry_index);
350  if (row.empty()) {
351  continue;
352  }
353 
354  entries_processed++;
355  size_t row_index = row_idx.fetch_add(1);
356 
357  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
358 
359  auto terminal_column_iter = std::prev(row.end());
360  const auto frag_offset_scalar_tv =
361  boost::get<ScalarTargetValue>(&*terminal_column_iter);
362  CHECK(frag_offset_scalar_tv);
363 
364  column_offsets[row_index] =
365  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
366  scalar_target_values[row_index] =
367  boost::get<ScalarTargetValue>(row[column_index]);
368  }
369  return entries_processed;
370  };
371 
372  auto get_row_index =
373  [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
374  return (thread_index * complete_entry_block_size);
375  };
376 
377  auto const* table_descriptor =
379  auto fragment_id = update_log.getFragmentId();
380  auto table_id = update_log.getPhysicalTableId();
381  if (!table_descriptor) {
382  const auto* input_source_node = update_parameters.getInputSourceNode();
383  if (auto proj_node = dynamic_cast<const RelProject*>(input_source_node)) {
384  if (proj_node->hasPushedDownWindowExpr() ||
385  proj_node->hasWindowFunctionExpr()) {
386  table_id = proj_node->getModifiedTableDescriptor()->tableId;
387  table_descriptor = catalog_.getMetadataForTable(table_id);
388  }
389  }
390  }
391  CHECK(table_descriptor);
392 
393  // Iterate over each column
394  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
395  column_index < update_parameters.getUpdateColumnCount();
396  column_index++) {
397  row_idx = 0;
398  RowProcessingFuturesVector entry_processing_futures;
399  entry_processing_futures.reserve(usable_threads);
400 
401  auto get_entry_at_func = [&update_log,
402  &column_index](const size_t entry_index) {
403  if (UNLIKELY(update_log.getColumnType(column_index).is_string())) {
404  return update_log.getTranslatedEntryAt(entry_index);
405  } else {
406  return update_log.getEntryAt(entry_index);
407  }
408  };
409 
410  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
411  entry_processing_futures.emplace_back(
413  std::forward<decltype(process_rows)>(process_rows),
414  get_entry_at_func,
415  column_index,
416  get_row_index(i),
417  complete_entry_block_size));
418  }
419  if (partial_row_block_size) {
420  entry_processing_futures.emplace_back(
422  std::forward<decltype(process_rows)>(process_rows),
423  get_entry_at_func,
424  column_index,
425  get_row_index(usable_threads),
426  partial_row_block_size));
427  }
428 
429  uint64_t entries_processed(0);
430  for (auto& t : entry_processing_futures) {
431  t.wait();
432  entries_processed += t.get();
433  }
434 
435  CHECK(row_idx == rows_per_column);
436  const auto fragmenter = table_descriptor->fragmenter;
437  CHECK(fragmenter);
438  auto const* target_column = catalog_.getMetadataForColumn(
439  table_id, update_parameters.getUpdateColumnNames()[column_index]);
440  auto update_stats =
441  fragmenter->updateColumn(&catalog_,
442  table_descriptor,
443  target_column,
444  fragment_id,
445  column_offsets,
446  scalar_target_values,
447  update_log.getColumnType(column_index),
449  update_parameters.getTransactionTracker());
451  table_update_metadata.columns_for_metadata_update[target_column].emplace(
452  fragment_id);
453  }
454  }
455  };
456  return callback;
457  }
458  }
SQLTypeInfo getColumnType(const size_t col_idx) const
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
bool should_recompute_metadata(const std::optional< Fragmenter_Namespace::ChunkUpdateStats > &update_stats)
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
int normalized_cpu_threads() const
future< Result > async(Fn &&fn, Args &&...args)
std::vector< TargetValue > getEntryAt(const size_t index) const override
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
Definition: Execute.h:346
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
Definition: Execute.h:343
FragmentInfoType const & getFragmentInfo() const
Catalog_Namespace::Catalog const & catalog_
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
#define UNLIKELY(x)
Definition: likely.h:25
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:492
#define CHECK(condition)
Definition: Logger.h:222
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:600
auto getResultSet() const
Definition: Execute.h:354
size_t const getRowCount() const override
size_t const getEntryCount() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog const& StorageIOFacility::catalog_
private

Definition at line 684 of file StorageIOFacility.h.

Referenced by yieldDeleteCallback(), and yieldUpdateCallback().

Executor* StorageIOFacility::executor_
private

Definition at line 683 of file StorageIOFacility.h.

Referenced by yieldUpdateCallback().


The documentation for this class was generated from the following file: