OmniSciDB  ba1bac9284
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StorageIOFacility Class Reference

#include <StorageIOFacility.h>

+ Inheritance diagram for StorageIOFacility:
+ Collaboration diagram for StorageIOFacility:

Classes

struct  DeleteTransactionParameters
 
class  TransactionParameters
 
class  UpdateTransactionParameters
 

Public Types

using UpdateCallback = UpdateLogForFragment::Callback
 
using TableDescriptorType = TableDescriptor
 
using DeleteVictimOffsetList = std::vector< uint64_t >
 
using UpdateTargetOffsetList = std::vector< uint64_t >
 
using UpdateTargetTypeList = std::vector< TargetMetaInfo >
 
using UpdateTargetColumnNamesList = std::vector< std::string >
 
using TransactionLog = Fragmenter_Namespace::InsertOrderFragmenter::ModifyTransactionTracker
 
using TransactionLogPtr = std::unique_ptr< TransactionLog >
 
using ColumnValidationFunction = std::function< bool(std::string const &)>
 

Public Member Functions

 StorageIOFacility (Executor *executor, Catalog_Namespace::Catalog const &catalog)
 
StorageIOFacility::UpdateCallback yieldUpdateCallback (UpdateTransactionParameters &update_parameters)
 
StorageIOFacility::UpdateCallback yieldDeleteCallback (DeleteTransactionParameters &delete_parameters)
 

Private Member Functions

int normalized_cpu_threads () const
 

Static Private Member Functions

static std::unique_ptr< int8_t[]> getRsBufferNoPadding (const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
 

Private Attributes

Executorexecutor_
 
Catalog_Namespace::Catalog constcatalog_
 

Detailed Description

Definition at line 106 of file StorageIOFacility.h.

Member Typedef Documentation

using StorageIOFacility::ColumnValidationFunction = std::function<bool(std::string const&)>

Definition at line 119 of file StorageIOFacility.h.

using StorageIOFacility::DeleteVictimOffsetList = std::vector<uint64_t>

Definition at line 111 of file StorageIOFacility.h.

Definition at line 118 of file StorageIOFacility.h.

using StorageIOFacility::UpdateTargetColumnNamesList = std::vector<std::string>

Definition at line 114 of file StorageIOFacility.h.

using StorageIOFacility::UpdateTargetOffsetList = std::vector<uint64_t>

Definition at line 112 of file StorageIOFacility.h.

Definition at line 113 of file StorageIOFacility.h.

Constructor & Destructor Documentation

StorageIOFacility::StorageIOFacility ( Executor executor,
Catalog_Namespace::Catalog const catalog 
)
inline

Definition at line 190 of file StorageIOFacility.h.

191  : executor_(executor), catalog_(catalog) {}
Catalog_Namespace::Catalog const & catalog_

Member Function Documentation

static std::unique_ptr<int8_t[]> StorageIOFacility::getRsBufferNoPadding ( const ResultSet rs,
size_t  col_idx,
const SQLTypeInfo column_type,
size_t  row_count 
)
inlinestaticprivate

Definition at line 626 of file StorageIOFacility.h.

References CHECK, SQLTypeInfo::get_logical_size(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), i, SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_fp(), and kFLOAT.

Referenced by yieldDeleteCallback(), and yieldUpdateCallback().

629  {
630  const auto padded_size = rs->getPaddedSlotWidthBytes(col_idx);
631  const auto type_size = column_type.is_dict_encoded_string()
632  ? column_type.get_size()
633  : column_type.get_logical_size();
634 
635  auto rs_buffer_size = padded_size * row_count;
636  auto rs_buffer = std::make_unique<int8_t[]>(rs_buffer_size);
637  rs->copyColumnIntoBuffer(col_idx, rs_buffer.get(), rs_buffer_size);
638 
639  if (type_size < padded_size) {
640  // else we're going to remove padding and we do it inplace in the same buffer
641  // we can do updates inplace in the same buffer because type_size < padded_size
642  // for some types, like kFLOAT, simple memcpy is not enough
643  auto src_ptr = rs_buffer.get();
644  auto dst_ptr = rs_buffer.get();
645  if (column_type.is_fp()) {
646  CHECK(column_type.get_type() == kFLOAT);
647  CHECK(padded_size == sizeof(double));
648  for (size_t i = 0; i < row_count; i++) {
649  const auto old_val = *reinterpret_cast<double*>(may_alias_ptr(src_ptr));
650  auto new_val = static_cast<float>(old_val);
651  std::memcpy(dst_ptr, &new_val, type_size);
652  dst_ptr += type_size;
653  src_ptr += padded_size;
654  }
655  } else {
656  // otherwise just take first type_size bytes from the padded value
657  for (size_t i = 0; i < row_count; i++) {
658  std::memcpy(dst_ptr, src_ptr, type_size);
659  dst_ptr += type_size;
660  src_ptr += padded_size;
661  }
662  }
663  }
664  return rs_buffer;
665  }
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
bool is_fp() const
Definition: sqltypes.h:493
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
int get_logical_size() const
Definition: sqltypes.h:325
#define CHECK(condition)
Definition: Logger.h:206
bool is_dict_encoded_string() const
Definition: sqltypes.h:526

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int StorageIOFacility::normalized_cpu_threads ( ) const
inlineprivate

Definition at line 624 of file StorageIOFacility.h.

References cpu_threads().

Referenced by yieldDeleteCallback(), and yieldUpdateCallback().

624 { return cpu_threads() / 2; }
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StorageIOFacility::UpdateCallback StorageIOFacility::yieldDeleteCallback ( DeleteTransactionParameters delete_parameters)
inline

Definition at line 444 of file StorageIOFacility.h.

References catalog_, CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, Catalog_Namespace::DBMetadata::dbId, Data_Namespace::DataMgr::deleteChunksWithPrefix(), Chunk_NS::Chunk::getChunk(), UpdateLogForFragment::getColumnType(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getDeletedColumn(), UpdateLogForFragment::getEntryAt(), UpdateLogForFragment::getEntryCount(), UpdateLogForFragment::getFragmentId(), UpdateLogForFragment::getFragmentInfo(), Catalog_Namespace::Catalog::getMetadataForTable(), UpdateLogForFragment::getPhysicalTableId(), UpdateLogForFragment::getResultSet(), UpdateLogForFragment::getRowCount(), getRsBufferNoPadding(), StorageIOFacility::TransactionParameters::getTableDescriptor(), StorageIOFacility::TransactionParameters::getTransactionTracker(), lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable(), Data_Namespace::GPU_LEVEL, i, kBOOLEAN, normalized_cpu_threads(), t, table_is_temporary(), StorageIOFacility::TransactionParameters::tableIsTemporary(), and UNLIKELY.

Referenced by RelAlgExecutor::executeDelete().

445  {
446  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
447 
448  if (delete_parameters.tableIsTemporary()) {
449  auto logical_table_id = delete_parameters.getTableDescriptor()->tableId;
450  auto callback = [this, logical_table_id](UpdateLogForFragment const& update_log,
451  TableUpdateMetadata&) -> void {
452  auto rs = update_log.getResultSet();
453  CHECK(rs->didOutputColumnar());
454  CHECK(rs->isDirectColumnarConversionPossible());
455  CHECK_EQ(rs->colCount(), size_t(1));
456 
457  // Temporary table updates require the full projected column
458  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
459 
460  const ChunkKey lock_chunk_key{catalog_.getCurrentDB().dbId, logical_table_id};
461  const auto table_lock =
463 
464  auto& fragment_info = update_log.getFragmentInfo();
465  const auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
466  CHECK(td);
467  const auto cd = catalog_.getDeletedColumn(td);
468  CHECK(cd);
469  CHECK(cd->columnType.get_type() == kBOOLEAN);
470  auto chunk_metadata =
471  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
472  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
473  ChunkKey chunk_key{catalog_.getCurrentDB().dbId,
474  td->tableId,
475  cd->columnId,
476  fragment_info.fragmentId};
477  auto chunk = Chunk_NS::Chunk::getChunk(cd,
478  &catalog_.getDataMgr(),
479  chunk_key,
481  0,
482  chunk_metadata->second->numBytes,
483  chunk_metadata->second->numElements);
484  CHECK(chunk);
485  auto chunk_buffer = chunk->getBuffer();
486  CHECK(chunk_buffer);
487 
488  auto encoder = chunk_buffer->getEncoder();
489  CHECK(encoder);
490 
491  auto owned_buffer = StorageIOFacility::getRsBufferNoPadding(
492  rs.get(), 0, cd->columnType, rs->rowCount());
493  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
494 
495  const auto new_chunk_metadata =
496  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
497 
498  auto fragmenter = td->fragmenter.get();
499  CHECK(fragmenter);
500 
501  // The fragmenter copy of the fragment info differs from the copy used by the
502  // query engine. Update metadata in the fragmenter directly.
503  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
504  // TODO: we may want to put this directly in the fragmenter so we are under the
505  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
506  // allowed in this path.
507 
508  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
509  fragment->shadowChunkMetadataMap =
510  fragment->getChunkMetadataMap(); // TODO(adb): needed?
511 
512  auto& data_mgr = catalog_.getDataMgr();
513  if (data_mgr.gpusPresent()) {
514  // flush any GPU copies of the updated chunk
515  data_mgr.deleteChunksWithPrefix(chunk_key,
517  }
518  };
519  return callback;
520  } else {
521  auto callback = [this, &delete_parameters](
522  UpdateLogForFragment const& update_log,
523  TableUpdateMetadata& table_update_metadata) -> void {
524  auto entries_per_column = update_log.getEntryCount();
525  auto rows_per_column = update_log.getRowCount();
526  if (rows_per_column == 0) {
527  return;
528  }
529  DeleteVictimOffsetList victim_offsets(rows_per_column);
530 
531  auto complete_row_block_size = entries_per_column / normalized_cpu_threads();
532  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
533  auto usable_threads = normalized_cpu_threads();
534 
535  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
536  complete_row_block_size = rows_per_column;
537  partial_row_block_size = 0;
538  usable_threads = 1;
539  }
540 
541  std::atomic<size_t> row_idx{0};
542 
543  auto process_rows = [&update_log, &victim_offsets, &row_idx](
544  uint64_t entry_start, uint64_t entry_count) -> uint64_t {
545  uint64_t entries_processed = 0;
546 
547  for (uint64_t entry_index = entry_start;
548  entry_index < (entry_start + entry_count);
549  entry_index++) {
550  auto const row(update_log.getEntryAt(entry_index));
551 
552  if (row.empty()) {
553  continue;
554  }
555 
556  entries_processed++;
557  size_t row_index = row_idx.fetch_add(1);
558 
559  auto terminal_column_iter = std::prev(row.end());
560  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
561  CHECK(scalar_tv);
562 
563  uint64_t fragment_offset =
564  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
565  victim_offsets[row_index] = fragment_offset;
566  }
567  return entries_processed;
568  };
569 
570  auto get_row_index =
571  [complete_row_block_size](uint64_t thread_index) -> uint64_t {
572  return thread_index * complete_row_block_size;
573  };
574 
575  RowProcessingFuturesVector row_processing_futures;
576  row_processing_futures.reserve(usable_threads);
577 
578  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
579  row_processing_futures.emplace_back(
580  std::async(std::launch::async,
581  std::forward<decltype(process_rows)>(process_rows),
582  get_row_index(i),
583  complete_row_block_size));
584  }
585  if (partial_row_block_size) {
586  row_processing_futures.emplace_back(
587  std::async(std::launch::async,
588  std::forward<decltype(process_rows)>(process_rows),
589  get_row_index(usable_threads),
590  partial_row_block_size));
591  }
592 
593  uint64_t rows_processed(0);
594  for (auto& t : row_processing_futures) {
595  t.wait();
596  rows_processed += t.get();
597  }
598 
599  auto const* table_descriptor =
601  CHECK(!table_is_temporary(table_descriptor));
602  auto* fragmenter = table_descriptor->fragmenter.get();
603  CHECK(fragmenter);
604 
605  auto const* deleted_column_desc = catalog_.getDeletedColumn(table_descriptor);
606  CHECK(deleted_column_desc);
607  fragmenter->updateColumn(&catalog_,
608  table_descriptor,
609  deleted_column_desc,
610  update_log.getFragmentId(),
611  victim_offsets,
612  ScalarTargetValue(int64_t(1L)),
613  update_log.getColumnType(0),
615  delete_parameters.getTransactionTracker());
616  table_update_metadata.fragments_with_deleted_rows[table_descriptor->tableId]
617  .emplace(update_log.getFragmentId());
618  };
619  return callback;
620  }
621  }
SQLTypeInfo getColumnType(const size_t col_idx) const
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::vector< int > ChunkKey
Definition: types.h:37
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:3126
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
int normalized_cpu_threads() const
std::vector< TargetValue > getEntryAt(const size_t index) const override
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
Definition: Execute.h:336
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
Definition: Execute.h:333
FragmentInfoType const & getFragmentInfo() const
Catalog_Namespace::Catalog const & catalog_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define UNLIKELY(x)
Definition: likely.h:25
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:436
bool table_is_temporary(const TableDescriptor *const td)
#define CHECK(condition)
Definition: Logger.h:206
char * t
std::vector< uint64_t > DeleteVictimOffsetList
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
auto getResultSet() const
Definition: Execute.h:344
size_t const getRowCount() const override
size_t const getEntryCount() const override
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StorageIOFacility::UpdateCallback StorageIOFacility::yieldUpdateCallback ( UpdateTransactionParameters update_parameters)
inline

Definition at line 193 of file StorageIOFacility.h.

References catalog_, CHECK, CHECK_EQ, Data_Namespace::CPU_LEVEL, Catalog_Namespace::DBMetadata::dbId, Data_Namespace::DataMgr::deleteChunksWithPrefix(), executor_, TableDescriptor::fragmenter, Chunk_NS::Chunk::getChunk(), UpdateLogForFragment::getColumnType(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), UpdateLogForFragment::getEntryAt(), UpdateLogForFragment::getEntryCount(), UpdateLogForFragment::getFragmentId(), UpdateLogForFragment::getFragmentInfo(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForTable(), UpdateLogForFragment::getPhysicalTableId(), UpdateLogForFragment::getResultSet(), UpdateLogForFragment::getRowCount(), getRsBufferNoPadding(), StorageIOFacility::TransactionParameters::getTableDescriptor(), StorageIOFacility::UpdateTransactionParameters::getTargetsMetaInfo(), StorageIOFacility::TransactionParameters::getTransactionTracker(), UpdateLogForFragment::getTranslatedEntryAt(), StorageIOFacility::UpdateTransactionParameters::getUpdateColumnCount(), StorageIOFacility::UpdateTransactionParameters::getUpdateColumnNames(), lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable(), Data_Namespace::GPU_LEVEL, i, SQLTypeInfo::is_string(), StorageIOFacility::UpdateTransactionParameters::isVarlenUpdateRequired(), normalized_cpu_threads(), anonymous_namespace{StorageIOFacility.h}::should_recompute_metadata(), t, StorageIOFacility::TransactionParameters::tableIsTemporary(), UNLIKELY, and foreign_storage::update_stats().

Referenced by RelAlgExecutor::executeUpdate().

194  {
195  using OffsetVector = std::vector<uint64_t>;
196  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
197  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
198 
199  if (update_parameters.isVarlenUpdateRequired()) {
200  auto callback = [this, &update_parameters](
201  UpdateLogForFragment const& update_log,
202  TableUpdateMetadata& table_update_metadata) -> void {
203  std::vector<const ColumnDescriptor*> columnDescriptors;
204  std::vector<TargetMetaInfo> sourceMetaInfos;
205 
206  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size();
207  idx++) {
208  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
209  auto target_column =
210  catalog_.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
211  columnDescriptors.push_back(target_column);
212  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
213  }
214 
215  auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
216  auto* fragmenter = td->fragmenter.get();
217  CHECK(fragmenter);
218 
219  fragmenter->updateColumns(
220  &catalog_,
221  td,
222  update_log.getFragmentId(),
223  sourceMetaInfos,
224  columnDescriptors,
225  update_log,
226  update_parameters.getUpdateColumnCount(), // last column of result set
228  update_parameters.getTransactionTracker(),
229  executor_);
230  table_update_metadata.fragments_with_deleted_rows[td->tableId].emplace(
231  update_log.getFragmentId());
232  };
233  return callback;
234  } else if (update_parameters.tableIsTemporary()) {
235  auto callback = [this, &update_parameters](UpdateLogForFragment const& update_log,
236  TableUpdateMetadata&) -> void {
237  auto rs = update_log.getResultSet();
238  CHECK(rs->didOutputColumnar());
239  CHECK(rs->isDirectColumnarConversionPossible());
240  CHECK_EQ(update_parameters.getUpdateColumnCount(), size_t(1));
241  CHECK_EQ(rs->colCount(), size_t(1));
242 
243  // Temporary table updates require the full projected column
244  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
245 
246  ChunkKey chunk_key_prefix{catalog_.getCurrentDB().dbId,
247  update_parameters.getTableDescriptor()->tableId};
248  const auto table_lock =
250 
251  auto& fragment_info = update_log.getFragmentInfo();
252  const auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
253  CHECK(td);
254  const auto cd = catalog_.getMetadataForColumn(
255  td->tableId, update_parameters.getUpdateColumnNames().front());
256  CHECK(cd);
257  auto chunk_metadata =
258  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
259  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
260  ChunkKey chunk_key{catalog_.getCurrentDB().dbId,
261  td->tableId,
262  cd->columnId,
263  fragment_info.fragmentId};
264  auto chunk = Chunk_NS::Chunk::getChunk(cd,
265  &catalog_.getDataMgr(),
266  chunk_key,
268  0,
269  chunk_metadata->second->numBytes,
270  chunk_metadata->second->numElements);
271  CHECK(chunk);
272  auto chunk_buffer = chunk->getBuffer();
273  CHECK(chunk_buffer);
274 
275  auto encoder = chunk_buffer->getEncoder();
276  CHECK(encoder);
277 
278  auto owned_buffer = StorageIOFacility::getRsBufferNoPadding(
279  rs.get(), 0, cd->columnType, rs->rowCount());
280  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
281 
282  const auto new_chunk_metadata =
283  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
284  CHECK(new_chunk_metadata);
285 
286  auto fragmenter = td->fragmenter.get();
287  CHECK(fragmenter);
288 
289  // The fragmenter copy of the fragment info differs from the copy used by the
290  // query engine. Update metadata in the fragmenter directly.
291  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
292  // TODO: we may want to put this directly in the fragmenter so we are under the
293  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
294  // allowed in this path.
295 
296  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
297  fragment->shadowChunkMetadataMap =
298  fragment->getChunkMetadataMap(); // TODO(adb): needed?
299 
300  auto& data_mgr = catalog_.getDataMgr();
301  if (data_mgr.gpusPresent()) {
302  // flush any GPU copies of the updated chunk
303  data_mgr.deleteChunksWithPrefix(chunk_key,
305  }
306  };
307  return callback;
308  } else {
309  auto callback = [this, &update_parameters](
310  UpdateLogForFragment const& update_log,
311  TableUpdateMetadata& table_update_metadata) -> void {
312  auto entries_per_column = update_log.getEntryCount();
313  auto rows_per_column = update_log.getRowCount();
314  if (rows_per_column == 0) {
315  return;
316  }
317 
318  OffsetVector column_offsets(rows_per_column);
319  ScalarTargetValueVector scalar_target_values(rows_per_column);
320 
321  auto complete_entry_block_size = entries_per_column / normalized_cpu_threads();
322  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
323  auto usable_threads = normalized_cpu_threads();
324  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
325  complete_entry_block_size = entries_per_column;
326  partial_row_block_size = 0;
327  usable_threads = 1;
328  }
329 
330  std::atomic<size_t> row_idx{0};
331 
332  auto process_rows =
333  [&update_parameters, &column_offsets, &scalar_target_values, &row_idx](
334  auto get_entry_at_func,
335  uint64_t column_index,
336  uint64_t entry_start,
337  uint64_t entry_count) -> uint64_t {
338  uint64_t entries_processed = 0;
339  for (uint64_t entry_index = entry_start;
340  entry_index < (entry_start + entry_count);
341  entry_index++) {
342  const auto& row = get_entry_at_func(entry_index);
343  if (row.empty()) {
344  continue;
345  }
346 
347  entries_processed++;
348  size_t row_index = row_idx.fetch_add(1);
349 
350  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
351 
352  auto terminal_column_iter = std::prev(row.end());
353  const auto frag_offset_scalar_tv =
354  boost::get<ScalarTargetValue>(&*terminal_column_iter);
355  CHECK(frag_offset_scalar_tv);
356 
357  column_offsets[row_index] =
358  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
359  scalar_target_values[row_index] =
360  boost::get<ScalarTargetValue>(row[column_index]);
361  }
362  return entries_processed;
363  };
364 
365  auto get_row_index =
366  [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
367  return (thread_index * complete_entry_block_size);
368  };
369 
370  auto const* table_descriptor =
372  CHECK(table_descriptor);
373  auto fragment_id = update_log.getFragmentId();
374 
375  // Iterate over each column
376  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
377  column_index < update_parameters.getUpdateColumnCount();
378  column_index++) {
379  row_idx = 0;
380  RowProcessingFuturesVector entry_processing_futures;
381  entry_processing_futures.reserve(usable_threads);
382 
383  auto get_entry_at_func = [&update_log,
384  &column_index](const size_t entry_index) {
385  if (UNLIKELY(update_log.getColumnType(column_index).is_string())) {
386  return update_log.getTranslatedEntryAt(entry_index);
387  } else {
388  return update_log.getEntryAt(entry_index);
389  }
390  };
391 
392  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
393  entry_processing_futures.emplace_back(
394  std::async(std::launch::async,
395  std::forward<decltype(process_rows)>(process_rows),
396  get_entry_at_func,
397  column_index,
398  get_row_index(i),
399  complete_entry_block_size));
400  }
401  if (partial_row_block_size) {
402  entry_processing_futures.emplace_back(
403  std::async(std::launch::async,
404  std::forward<decltype(process_rows)>(process_rows),
405  get_entry_at_func,
406  column_index,
407  get_row_index(usable_threads),
408  partial_row_block_size));
409  }
410 
411  uint64_t entries_processed(0);
412  for (auto& t : entry_processing_futures) {
413  t.wait();
414  entries_processed += t.get();
415  }
416 
417  CHECK(row_idx == rows_per_column);
418 
419  const auto table_id = update_log.getPhysicalTableId();
420  const auto fragmenter = table_descriptor->fragmenter;
421  CHECK(fragmenter);
422  auto const* target_column = catalog_.getMetadataForColumn(
423  table_id, update_parameters.getUpdateColumnNames()[column_index]);
424  auto update_stats =
425  fragmenter->updateColumn(&catalog_,
426  table_descriptor,
427  target_column,
428  fragment_id,
429  column_offsets,
430  scalar_target_values,
431  update_log.getColumnType(column_index),
433  update_parameters.getTransactionTracker());
435  table_update_metadata.columns_for_metadata_update[target_column].emplace(
436  fragment_id);
437  }
438  }
439  };
440  return callback;
441  }
442  }
SQLTypeInfo getColumnType(const size_t col_idx) const
#define CHECK_EQ(x, y)
Definition: Logger.h:214
std::vector< int > ChunkKey
Definition: types.h:37
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
bool should_recompute_metadata(const std::optional< Fragmenter_Namespace::ChunkUpdateStats > &update_stats)
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
int normalized_cpu_threads() const
std::vector< TargetValue > getEntryAt(const size_t index) const override
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
Definition: Execute.h:336
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
Definition: Execute.h:333
FragmentInfoType const & getFragmentInfo() const
Catalog_Namespace::Catalog const & catalog_
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
#define UNLIKELY(x)
Definition: likely.h:25
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:436
#define CHECK(condition)
Definition: Logger.h:206
char * t
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:489
auto getResultSet() const
Definition: Execute.h:344
size_t const getRowCount() const override
size_t const getEntryCount() const override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog const& StorageIOFacility::catalog_
private

Definition at line 668 of file StorageIOFacility.h.

Referenced by yieldDeleteCallback(), and yieldUpdateCallback().

Executor* StorageIOFacility::executor_
private

Definition at line 667 of file StorageIOFacility.h.

Referenced by yieldUpdateCallback().


The documentation for this class was generated from the following file: