72 const std::optional<Fragmenter_Namespace::ChunkUpdateStats>&
update_stats) {
77 CHECK(update_stats->chunk);
78 CHECK(update_stats->chunk->getBuffer());
79 CHECK(update_stats->chunk->getBuffer()->getEncoder());
81 auto chunk_metadata = std::make_shared<ChunkMetadata>();
82 update_stats->chunk->getBuffer()->getEncoder()->getMetadata(chunk_metadata);
83 auto cd = update_stats.value().chunk->getColumnDesc();
84 if (cd->columnType.is_fp()) {
86 if (cd->columnType.get_type() ==
kDOUBLE) {
87 min = chunk_metadata->chunkStats.min.doubleval;
88 max = chunk_metadata->chunkStats.max.doubleval;
89 }
else if (cd->columnType.get_type() ==
kFLOAT) {
90 min = chunk_metadata->chunkStats.min.floatval;
91 max = chunk_metadata->chunkStats.max.floatval;
182 bool varlen_update_required)
208 using OffsetVector = std::vector<uint64_t>;
209 using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
210 using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
213 auto callback = [
this, &update_parameters](
216 std::vector<const ColumnDescriptor*> columnDescriptors;
217 std::vector<TargetMetaInfo> sourceMetaInfos;
219 const auto& catalog = update_parameters.
getCatalog();
225 columnDescriptors.push_back(target_column);
230 auto* fragmenter = td->fragmenter.get();
233 fragmenter->updateColumns(
244 table_update_metadata.fragments_with_deleted_rows[td->tableId].emplace(
245 update_log.getFragmentId());
252 CHECK(rs->didOutputColumnar());
253 CHECK(rs->isDirectColumnarConversionPossible());
255 CHECK_EQ(rs->colCount(), size_t(1));
260 const auto& catalog = update_parameters.
getCatalog();
263 const auto table_lock =
269 const auto cd = catalog.getMetadataForColumn(
272 auto chunk_metadata =
273 fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
274 CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
275 ChunkKey chunk_key{catalog.getCurrentDB().dbId,
278 fragment_info.fragmentId};
280 &catalog.getDataMgr(),
284 chunk_metadata->second->numBytes,
285 chunk_metadata->second->numElements);
287 auto chunk_buffer = chunk->getBuffer();
290 auto encoder = chunk_buffer->getEncoder();
294 rs.get(), 0, cd->columnType, rs->rowCount());
295 auto buffer =
reinterpret_cast<int8_t*
>(owned_buffer.get());
297 const auto new_chunk_metadata =
298 encoder->appendData(buffer, rs->rowCount(), cd->columnType,
false, 0);
299 CHECK(new_chunk_metadata);
301 auto fragmenter = td->fragmenter.get();
306 auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
311 fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
312 fragment->shadowChunkMetadataMap =
313 fragment->getChunkMetadataMapPhysicalCopy();
315 auto& data_mgr = catalog.getDataMgr();
316 if (data_mgr.gpusPresent()) {
318 data_mgr.deleteChunksWithPrefix(chunk_key,
324 auto callback = [
this, &update_parameters](
329 if (rows_per_column == 0) {
333 OffsetVector column_offsets(rows_per_column);
334 ScalarTargetValueVector scalar_target_values(rows_per_column);
340 complete_entry_block_size = entries_per_column;
341 partial_row_block_size = 0;
345 std::atomic<size_t> row_idx{0};
348 [&update_parameters, &column_offsets, &scalar_target_values, &row_idx](
349 auto get_entry_at_func,
350 uint64_t column_index,
351 uint64_t entry_start,
352 uint64_t entry_count) -> uint64_t {
353 uint64_t entries_processed = 0;
354 for (uint64_t entry_index = entry_start;
355 entry_index < (entry_start + entry_count);
357 const auto& row = get_entry_at_func(entry_index);
363 size_t row_index = row_idx.fetch_add(1);
367 auto terminal_column_iter = std::prev(row.end());
368 const auto frag_offset_scalar_tv =
369 boost::get<ScalarTargetValue>(&*terminal_column_iter);
370 CHECK(frag_offset_scalar_tv);
372 column_offsets[row_index] =
373 static_cast<uint64_t
>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
374 scalar_target_values[row_index] =
375 boost::get<ScalarTargetValue>(row[column_index]);
377 return entries_processed;
381 [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
382 return (thread_index * complete_entry_block_size);
385 const auto& catalog = update_parameters.
getCatalog();
386 auto const* table_descriptor =
390 if (!table_descriptor) {
392 if (
auto proj_node = dynamic_cast<const RelProject*>(input_source_node)) {
393 if (proj_node->hasPushedDownWindowExpr() ||
394 proj_node->hasWindowFunctionExpr()) {
395 table_id = proj_node->getModifiedTableDescriptor()->tableId;
396 table_descriptor = catalog.getMetadataForTable(table_id);
400 CHECK(table_descriptor);
407 RowProcessingFuturesVector entry_processing_futures;
408 entry_processing_futures.reserve(usable_threads);
410 auto get_entry_at_func = [&update_log,
411 &column_index](
const size_t entry_index) {
419 for (
unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
420 entry_processing_futures.emplace_back(
422 std::forward<decltype(process_rows)>(process_rows),
426 complete_entry_block_size));
428 if (partial_row_block_size) {
429 entry_processing_futures.emplace_back(
431 std::forward<decltype(process_rows)>(process_rows),
434 get_row_index(usable_threads),
435 partial_row_block_size));
438 uint64_t entries_processed(0);
439 for (
auto& t : entry_processing_futures) {
441 entries_processed += t.get();
444 CHECK(row_idx == rows_per_column);
445 const auto fragmenter = table_descriptor->fragmenter;
447 auto const* target_column = catalog.getMetadataForColumn(
449 CHECK(target_column);
451 fragmenter->updateColumn(&catalog,
456 scalar_target_values,
461 table_update_metadata.columns_for_metadata_update[target_column].emplace(
472 using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
476 const auto& catalog = delete_parameters.
getCatalog();
479 auto rs = update_log.getResultSet();
480 CHECK(rs->didOutputColumnar());
481 CHECK(rs->isDirectColumnarConversionPossible());
482 CHECK_EQ(rs->colCount(), size_t(1));
485 CHECK_EQ(rs->rowCount(), update_log.getRowCount());
487 const ChunkKey lock_chunk_key{catalog.getCurrentDB().dbId, logical_table_id};
488 const auto table_lock =
491 auto& fragment_info = update_log.getFragmentInfo();
492 const auto td = catalog.getMetadataForTable(update_log.getPhysicalTableId());
494 const auto cd = catalog.getDeletedColumn(td);
497 auto chunk_metadata =
498 fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
499 CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
500 ChunkKey chunk_key{catalog.getCurrentDB().dbId,
503 fragment_info.fragmentId};
505 &catalog.getDataMgr(),
509 chunk_metadata->second->numBytes,
510 chunk_metadata->second->numElements);
512 auto chunk_buffer = chunk->getBuffer();
515 auto encoder = chunk_buffer->getEncoder();
519 rs.get(), 0, cd->columnType, rs->rowCount());
520 auto buffer =
reinterpret_cast<int8_t*
>(owned_buffer.get());
522 const auto new_chunk_metadata =
523 encoder->appendData(buffer, rs->rowCount(), cd->columnType,
false, 0);
525 auto fragmenter = td->fragmenter.get();
530 auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
535 fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
536 fragment->shadowChunkMetadataMap =
537 fragment->getChunkMetadataMapPhysicalCopy();
539 auto& data_mgr = catalog.getDataMgr();
540 if (data_mgr.gpusPresent()) {
542 data_mgr.deleteChunksWithPrefix(chunk_key,
548 auto callback = [
this, &delete_parameters](
553 if (rows_per_column == 0) {
563 complete_row_block_size = rows_per_column;
564 partial_row_block_size = 0;
568 std::atomic<size_t> row_idx{0};
570 auto process_rows = [&update_log, &victim_offsets, &row_idx](
571 uint64_t entry_start, uint64_t entry_count) -> uint64_t {
572 uint64_t entries_processed = 0;
574 for (uint64_t entry_index = entry_start;
575 entry_index < (entry_start + entry_count);
577 auto const row(update_log.
getEntryAt(entry_index));
584 size_t row_index = row_idx.fetch_add(1);
586 auto terminal_column_iter = std::prev(row.end());
587 const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
590 uint64_t fragment_offset =
591 static_cast<uint64_t
>(*(boost::get<int64_t>(scalar_tv)));
592 victim_offsets[row_index] = fragment_offset;
594 return entries_processed;
598 [complete_row_block_size](uint64_t thread_index) -> uint64_t {
599 return thread_index * complete_row_block_size;
602 RowProcessingFuturesVector row_processing_futures;
603 row_processing_futures.reserve(usable_threads);
605 for (
unsigned i = 0; i < (unsigned)usable_threads; i++) {
606 row_processing_futures.emplace_back(
608 std::forward<decltype(process_rows)>(process_rows),
610 complete_row_block_size));
612 if (partial_row_block_size) {
613 row_processing_futures.emplace_back(
615 std::forward<decltype(process_rows)>(process_rows),
616 get_row_index(usable_threads),
617 partial_row_block_size));
620 uint64_t rows_processed(0);
621 for (
auto& t : row_processing_futures) {
623 rows_processed += t.get();
626 const auto& catalog = delete_parameters.
getCatalog();
627 auto const* table_descriptor =
629 CHECK(table_descriptor);
631 auto* fragmenter = table_descriptor->fragmenter.get();
634 auto const* deleted_column_desc = catalog.getDeletedColumn(table_descriptor);
635 CHECK(deleted_column_desc);
636 fragmenter->updateColumn(&catalog,
645 table_update_metadata.fragments_with_deleted_rows[table_descriptor->tableId]
659 const auto padded_size = rs->getPaddedSlotWidthBytes(col_idx);
664 auto rs_buffer_size = padded_size * row_count;
665 auto rs_buffer = std::make_unique<int8_t[]>(rs_buffer_size);
666 rs->copyColumnIntoBuffer(col_idx, rs_buffer.get(), rs_buffer_size);
668 if (type_size < padded_size) {
672 auto src_ptr = rs_buffer.get();
673 auto dst_ptr = rs_buffer.get();
674 if (column_type.
is_fp()) {
676 CHECK(padded_size ==
sizeof(
double));
677 for (
size_t i = 0; i < row_count; i++) {
678 const auto old_val = *
reinterpret_cast<double*
>(may_alias_ptr(src_ptr));
679 auto new_val =
static_cast<float>(old_val);
680 std::memcpy(dst_ptr, &new_val, type_size);
681 dst_ptr += type_size;
682 src_ptr += padded_size;
686 for (
size_t i = 0; i < row_count; i++) {
687 std::memcpy(dst_ptr, src_ptr, type_size);
688 dst_ptr += type_size;
689 src_ptr += padded_size;
SQLTypeInfo getColumnType(const size_t col_idx) const
UpdateValuesStats new_values_stats
auto isVarlenUpdateRequired() const
std::vector< int > ChunkKey
StorageIOFacility::TransactionLog transaction_tracker_
HOST DEVICE int get_size() const
class for a per-database catalog. also includes metadata for the current database and the current use...
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const Catalog_Namespace::Catalog & getCatalog() const
std::vector< TargetMetaInfo > UpdateTargetTypeList
const Catalog_Namespace::Catalog & catalog_
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
UpdateTargetColumnNamesList update_column_names_
bool should_recompute_metadata(const std::optional< Fragmenter_Namespace::ChunkUpdateStats > &update_stats)
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
bool is_chunk_min_max_updated(const Fragmenter_Namespace::ChunkUpdateStats &update_stats, int64_t min, int64_t max)
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
virtual ~TransactionParameters()=default
bool g_enable_auto_metadata_update
int normalized_cpu_threads() const
HOST DEVICE SQLTypes get_type() const
UpdateTransactionParameters & operator=(UpdateTransactionParameters const &other)=delete
std::function< void(const UpdateLogForFragment &, TableUpdateMetadata &)> Callback
std::vector< std::string > UpdateTargetColumnNamesList
std::vector< uint64_t > UpdateTargetOffsetList
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
future< Result > async(Fn &&fn, Args &&...args)
std::vector< TargetValue > getEntryAt(const size_t index) const override
int get_logical_size() const
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
const DBMetadata & getCurrentDB() const
auto getUpdateColumnCount() const
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
FragmentInfoType const & getFragmentInfo() const
DeleteTransactionParameters & operator=(DeleteTransactionParameters const &other)=delete
DeleteTransactionParameters(const TableDescriptorType *table_descriptor, const Catalog_Namespace::Catalog &catalog)
UpdateTargetTypeList const & targets_meta_
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
const RelAlgNode * input_source_node_
void checkpointWithAutoRollback(const int logical_table_id) const
std::unique_ptr< TransactionLog > TransactionLogPtr
std::function< bool(std::string const &)> ColumnValidationFunction
auto const & getUpdateColumnNames() const
bool table_is_temporary(const TableDescriptor *const td)
auto getTargetsMetaInfoSize() const
Data_Namespace::MemoryLevel persistenceLevel
void setInputSourceNode(const RelAlgNode *input_source_node)
auto const * getTableDescriptor() const
TableDescriptorType const * table_descriptor_
UpdateLogForFragment::Callback UpdateCallback
UpdelRoll ModifyTransactionTracker
StorageIOFacility::TransactionLog & getTransactionTracker()
std::vector< uint64_t > DeleteVictimOffsetList
auto tableIsTemporary() const
bool is_dict_encoded_string() const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
const RelAlgNode * getInputSourceNode() const
TransactionParameters(const TableDescriptorType *table_descriptor, const Catalog_Namespace::Catalog &catalog)
auto getResultSet() const
auto const & getTargetsMetaInfo() const
void finalizeTransaction(const Catalog_Namespace::Catalog &catalog)
size_t const getRowCount() const override
bool varlen_update_required_
StorageIOFacility(Executor *executor)
UpdateValuesStats old_values_stats
UpdateTransactionParameters(TableDescriptorType const *table_descriptor, const Catalog_Namespace::Catalog &catalog, UpdateTargetColumnNamesList const &update_column_names, UpdateTargetTypeList const &target_types, bool varlen_update_required)
size_t const getEntryCount() const override
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue