86 bool varlen_update_required)
116 using OffsetVector = std::vector<uint64_t>;
117 using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
118 using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
123 std::vector<const ColumnDescriptor*> columnDescriptors;
124 std::vector<TargetMetaInfo> sourceMetaInfos;
131 columnDescriptors.push_back(target_column);
139 fragmenter->updateColumns(
142 update_log.getFragmentId(),
155 auto rs = update_log.getResultSet();
156 CHECK(rs->didOutputColumnar());
157 CHECK(rs->isDirectColumnarConversionPossible());
159 CHECK_EQ(rs->colCount(), size_t(1));
162 CHECK_EQ(rs->rowCount(), update_log.getRowCount());
166 const auto table_lock =
169 auto& fragment_info = update_log.getFragmentInfo();
175 auto chunk_metadata =
176 fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
177 CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
181 fragment_info.fragmentId};
187 chunk_metadata->second->numBytes,
188 chunk_metadata->second->numElements);
190 auto chunk_buffer = chunk->getBuffer();
193 auto encoder = chunk_buffer->getEncoder();
197 rs.get(), 0, cd->columnType, rs->rowCount());
198 auto buffer =
reinterpret_cast<int8_t*
>(owned_buffer.get());
200 const auto new_chunk_metadata =
201 encoder->appendData(buffer, rs->rowCount(), cd->columnType,
false, 0);
202 CHECK(new_chunk_metadata);
204 auto fragmenter = td->fragmenter.get();
209 auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
214 fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
215 fragment->shadowChunkMetadataMap =
216 fragment->getChunkMetadataMap();
219 if (data_mgr.gpusPresent()) {
229 auto entries_per_column = update_log.getEntryCount();
230 auto rows_per_column = update_log.getRowCount();
231 if (rows_per_column == 0) {
235 OffsetVector column_offsets(rows_per_column);
236 ScalarTargetValueVector scalar_target_values(rows_per_column);
242 complete_entry_block_size = entries_per_column;
243 partial_row_block_size = 0;
247 std::atomic<size_t> row_idx{0};
250 [&update_parameters, &column_offsets, &scalar_target_values, &row_idx](
251 auto get_entry_at_func,
252 uint64_t column_index,
253 uint64_t entry_start,
254 uint64_t entry_count) -> uint64_t {
255 uint64_t entries_processed = 0;
256 for (uint64_t entry_index = entry_start;
257 entry_index < (entry_start + entry_count);
259 const auto& row = get_entry_at_func(entry_index);
265 size_t row_index = row_idx.fetch_add(1);
269 auto terminal_column_iter = std::prev(row.end());
270 const auto frag_offset_scalar_tv =
271 boost::get<ScalarTargetValue>(&*terminal_column_iter);
272 CHECK(frag_offset_scalar_tv);
274 column_offsets[row_index] =
275 static_cast<uint64_t
>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
276 scalar_target_values[row_index] =
277 boost::get<ScalarTargetValue>(row[column_index]);
279 return entries_processed;
283 [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
284 return (thread_index * complete_entry_block_size);
292 RowProcessingFuturesVector entry_processing_futures;
293 entry_processing_futures.reserve(usable_threads);
295 auto get_entry_at_func = [&update_log,
296 &column_index](
const size_t entry_index) {
297 if (
UNLIKELY(update_log.getColumnType(column_index).is_string())) {
298 return update_log.getTranslatedEntryAt(entry_index);
300 return update_log.getEntryAt(entry_index);
304 for (
unsigned i = 0; i < static_cast<unsigned>(usable_threads);
i++) {
305 entry_processing_futures.emplace_back(
306 std::async(std::launch::async,
307 std::forward<decltype(process_rows)>(process_rows),
311 complete_entry_block_size));
313 if (partial_row_block_size) {
314 entry_processing_futures.emplace_back(
315 std::async(std::launch::async,
316 std::forward<decltype(process_rows)>(process_rows),
319 get_row_index(usable_threads),
320 partial_row_block_size));
323 uint64_t entries_processed(0);
324 for (
auto&
t : entry_processing_futures) {
326 entries_processed +=
t.get();
329 CHECK(row_idx == rows_per_column);
331 const auto table_id = update_log.getPhysicalTableId();
332 auto const* table_descriptor =
334 CHECK(table_descriptor);
335 const auto fragmenter = table_descriptor->fragmenter;
343 update_log.getFragmentId(),
345 scalar_target_values,
346 update_log.getColumnType(column_index),
357 using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
361 auto rs = update_log.getResultSet();
362 CHECK(rs->didOutputColumnar());
363 CHECK(rs->isDirectColumnarConversionPossible());
364 CHECK_EQ(rs->colCount(), size_t(1));
367 CHECK_EQ(rs->rowCount(), update_log.getRowCount());
370 update_log.getPhysicalTableId()};
371 const auto table_lock =
374 auto& fragment_info = update_log.getFragmentInfo();
380 auto chunk_metadata =
381 fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
382 CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
386 fragment_info.fragmentId};
392 chunk_metadata->second->numBytes,
393 chunk_metadata->second->numElements);
395 auto chunk_buffer = chunk->getBuffer();
398 auto encoder = chunk_buffer->getEncoder();
402 rs.get(), 0, cd->columnType, rs->rowCount());
403 auto buffer =
reinterpret_cast<int8_t*
>(owned_buffer.get());
405 const auto new_chunk_metadata =
406 encoder->appendData(buffer, rs->rowCount(), cd->columnType,
false, 0);
408 auto fragmenter = td->fragmenter.get();
413 auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
418 fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
419 fragment->shadowChunkMetadataMap =
420 fragment->getChunkMetadataMap();
423 if (data_mgr.gpusPresent()) {
433 auto entries_per_column = update_log.getEntryCount();
434 auto rows_per_column = update_log.getRowCount();
435 if (rows_per_column == 0) {
445 complete_row_block_size = rows_per_column;
446 partial_row_block_size = 0;
450 std::atomic<size_t> row_idx{0};
452 auto process_rows = [&update_log, &victim_offsets, &row_idx](
453 uint64_t entry_start, uint64_t entry_count) -> uint64_t {
454 uint64_t entries_processed = 0;
456 for (uint64_t entry_index = entry_start;
457 entry_index < (entry_start + entry_count);
459 auto const row(update_log.getEntryAt(entry_index));
466 size_t row_index = row_idx.fetch_add(1);
468 auto terminal_column_iter = std::prev(row.end());
469 const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
472 uint64_t fragment_offset =
473 static_cast<uint64_t
>(*(boost::get<int64_t>(scalar_tv)));
474 victim_offsets[row_index] = fragment_offset;
476 return entries_processed;
480 [complete_row_block_size](uint64_t thread_index) -> uint64_t {
481 return thread_index * complete_row_block_size;
484 RowProcessingFuturesVector row_processing_futures;
485 row_processing_futures.reserve(usable_threads);
487 for (
unsigned i = 0;
i < (unsigned)usable_threads;
i++) {
488 row_processing_futures.emplace_back(
489 std::async(std::launch::async,
490 std::forward<decltype(process_rows)>(process_rows),
492 complete_row_block_size));
494 if (partial_row_block_size) {
495 row_processing_futures.emplace_back(
496 std::async(std::launch::async,
497 std::forward<decltype(process_rows)>(process_rows),
498 get_row_index(usable_threads),
499 partial_row_block_size));
502 uint64_t rows_processed(0);
503 for (
auto&
t : row_processing_futures) {
505 rows_processed +=
t.get();
508 auto const* table_descriptor =
511 auto* fragmenter = table_descriptor->fragmenter.get();
515 CHECK(deleted_column_desc);
519 update_log.getFragmentId(),
522 update_log.getColumnType(0),
537 const auto padded_size = rs->getPaddedSlotWidthBytes(col_idx);
542 auto rs_buffer_size = padded_size * row_count;
543 auto rs_buffer = std::make_unique<int8_t[]>(rs_buffer_size);
544 rs->copyColumnIntoBuffer(col_idx, rs_buffer.get(), rs_buffer_size);
546 if (type_size < padded_size) {
550 auto src_ptr = rs_buffer.get();
551 auto dst_ptr = rs_buffer.get();
552 if (column_type.
is_fp()) {
554 CHECK(padded_size ==
sizeof(
double));
555 for (
size_t i = 0;
i < row_count;
i++) {
556 const auto old_val = *
reinterpret_cast<double*
>(may_alias_ptr(src_ptr));
557 auto new_val =
static_cast<float>(old_val);
558 std::memcpy(dst_ptr, &new_val, type_size);
559 dst_ptr += type_size;
560 src_ptr += padded_size;
564 for (
size_t i = 0;
i < row_count;
i++) {
565 std::memcpy(dst_ptr, src_ptr, type_size);
566 dst_ptr += type_size;
567 src_ptr += padded_size;
auto isVarlenUpdateRequired() const
std::vector< int > ChunkKey
StorageIOFacility::TransactionLog transaction_tracker_
HOST DEVICE int get_size() const
void finalizeTransaction()
class for a per-database catalog. also includes metadata for the current database and the current use...
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
std::vector< TargetMetaInfo > UpdateTargetTypeList
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
UpdateTargetColumnNamesList update_column_names_
Data_Namespace::DataMgr & getDataMgr() const
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
virtual ~TransactionParameters()=default
int normalized_cpu_threads() const
HOST DEVICE SQLTypes get_type() const
UpdateTransactionParameters & operator=(UpdateTransactionParameters const &other)=delete
DeleteTransactionParameters(const bool table_is_temporary)
std::function< void(const UpdateLogForFragment &)> Callback
TransactionParameters(const bool table_is_temporary)
std::vector< std::string > UpdateTargetColumnNamesList
std::vector< uint64_t > UpdateTargetOffsetList
StorageIOFacility(Executor *executor, Catalog_Namespace::Catalog const &catalog)
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
int get_logical_size() const
const DBMetadata & getCurrentDB() const
auto getUpdateColumnCount() const
Catalog_Namespace::Catalog const & catalog_
DeleteTransactionParameters & operator=(DeleteTransactionParameters const &other)=delete
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
UpdateTargetTypeList const & targets_meta_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::unique_ptr< TransactionLog > TransactionLogPtr
std::function< bool(std::string const &)> ColumnValidationFunction
auto const & getUpdateColumnNames() const
auto tableIsTemporary() const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
TableDescriptorType const * table_descriptor_
bool table_is_temporary(const TableDescriptor *const td)
auto getTargetsMetaInfoSize() const
UpdateLogForFragment::Callback UpdateCallback
UpdelRoll ModifyTransactionTracker
StorageIOFacility::TransactionLog & getTransactionTracker()
std::vector< uint64_t > DeleteVictimOffsetList
auto tableIsTemporary() const
UpdateTransactionParameters(TableDescriptorType const *table_desc, UpdateTargetColumnNamesList const &update_column_names, UpdateTargetTypeList const &target_types, bool varlen_update_required)
bool is_dict_encoded_string() const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
auto const & getTargetsMetaInfo() const
auto const * getTableDescriptor() const
bool varlen_update_required_
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue