22 #include <boost/variant.hpp>
23 #include <boost/variant/get.hpp>
40 namespace Fragmenter_Namespace {
43 for (
auto& t : threads) {
58 const int fragment_id,
59 const std::vector<uint64_t>& frag_offsets,
69 std::vector<ScalarTargetValue>(1, rhs_value),
79 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
80 for (
int cid = 1, nc = 0; nc < td->
nColumns; ++cid) {
83 if (!cd->isVirtualCol) {
93 chunk_meta_it->second->numBytes,
94 chunk_meta_it->second->numElements);
95 chunks.push_back(chunk);
112 template <
typename BUFFER_DATA_TYPE,
typename INSERT_DATA_TYPE>
115 std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
133 auto insert_value =
static_cast<INSERT_DATA_TYPE
>(buffer_value);
140 insertData.
data.push_back(dataBlock);
155 column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
175 insertData.
data.push_back(dataBlock);
195 size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
212 column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
222 size_t src_value_size =
225 (*column_data_)[row] = std::string((
const char*)src_value_ptr, src_value_size);
231 insertData.
data.push_back(dataBlock);
236 template <
typename BUFFER_DATA_TYPE>
238 using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
248 reinterpret_cast<int64_t*>(
checked_malloc(num_rows *
sizeof(int64_t))));
256 auto insert_value =
static_cast<int64_t
>(buffer_value);
263 insertData.
data.push_back(dataBlock);
271 const int fragmentId,
272 const std::vector<TargetMetaInfo> sourceMetaInfo,
273 const std::vector<const ColumnDescriptor*> columnDescriptors,
275 const size_t indexOffFragmentOffsetColumn,
278 Executor* executor) {
285 size_t num_rows = sourceDataProvider.
getRowCount();
295 auto& fragment = *fragment_ptr;
296 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
297 get_chunks(catalog, td, fragment, memoryLevel, chunks);
298 std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
299 columnDescriptors.size());
300 std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
301 size_t indexOfDeletedColumn{0};
302 std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
303 for (
size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
304 auto chunk = chunks[indexOfChunk];
305 const auto chunk_cd = chunk->getColumnDesc();
307 if (chunk_cd->isDeletedCol) {
308 indexOfDeletedColumn = chunk_cd->columnId;
309 deletedChunk = chunk;
313 auto targetColumnIt = std::find_if(columnDescriptors.begin(),
314 columnDescriptors.end(),
316 return cd->columnId == chunk_cd->columnId;
319 if (targetColumnIt != columnDescriptors.end()) {
320 auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
322 auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
323 auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
330 targetDescriptor->columnType,
331 !targetDescriptor->columnType.get_notnull(),
334 sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
335 ? executor->getStringDictionaryProxy(
336 sourceDataMetaInfo.get_type_info().getStringDictKey(),
337 executor->getRowSetMemoryOwner(),
341 auto converter = factory.
create(param);
342 sourceDataConverters[indexOfTargetColumn] = std::move(converter);
344 if (targetDescriptor->columnType.is_geometry()) {
347 switch (targetDescriptor->columnType.get_type()) {
369 if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
370 std::unique_ptr<ChunkToInsertDataConverter> converter;
372 if (chunk_cd->columnType.is_fixlen_array()) {
374 std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
375 }
else if (chunk_cd->columnType.is_string()) {
376 converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
377 }
else if (chunk_cd->columnType.is_geometry()) {
379 converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
381 converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
384 chunkConverters.push_back(std::move(converter));
386 }
else if (chunk_cd->columnType.is_date_in_days()) {
394 std::unique_ptr<ChunkToInsertDataConverter> converter;
395 const size_t physical_size = chunk_cd->columnType.get_size();
396 if (physical_size == 2) {
398 std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
399 }
else if (physical_size == 4) {
401 std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
405 chunkConverters.push_back(std::move(converter));
407 std::unique_ptr<ChunkToInsertDataConverter> converter;
409 int logical_size = logical_type.
get_size();
410 int physical_size = chunk_cd->columnType.get_size();
414 logical_size = physical_size;
417 if (8 == physical_size) {
418 converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
419 num_rows, chunk.get());
420 }
else if (4 == physical_size) {
421 if (8 == logical_size) {
422 converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
423 num_rows, chunk.get());
425 converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
426 num_rows, chunk.get());
428 }
else if (2 == chunk_cd->columnType.get_size()) {
429 if (8 == logical_size) {
430 converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
431 num_rows, chunk.get());
432 }
else if (4 == logical_size) {
433 converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
434 num_rows, chunk.get());
436 converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
437 num_rows, chunk.get());
439 }
else if (1 == chunk_cd->columnType.get_size()) {
440 if (8 == logical_size) {
441 converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
442 num_rows, chunk.get());
443 }
else if (4 == logical_size) {
444 converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
445 num_rows, chunk.get());
446 }
else if (2 == logical_size) {
447 converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
448 num_rows, chunk.get());
450 converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
451 num_rows, chunk.get());
457 chunkConverters.push_back(std::move(converter));
466 bool* deletedChunkBuffer =
467 reinterpret_cast<bool*
>(deletedChunk->getBuffer()->getMemoryPtr());
469 std::atomic<size_t> row_idx{0};
471 auto row_converter = [&sourceDataProvider,
472 &sourceDataConverters,
473 &indexOffFragmentOffsetColumn,
476 &row_idx](
size_t indexOfEntry) ->
void {
478 const auto row = sourceDataProvider.
getEntryAt(indexOfEntry);
483 size_t indexOfRow = row_idx.fetch_add(1);
485 for (
size_t col = 0; col < sourceDataConverters.size(); col++) {
486 if (sourceDataConverters[col]) {
487 const auto& mapd_variant = row[col];
488 sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
493 indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
494 auto indexInChunkBuffer = *
checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
497 for (
size_t idx = 0; idx < chunkConverters.size(); idx++) {
498 chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
502 deletedChunkBuffer[indexInChunkBuffer] =
true;
505 bool can_go_parallel = num_rows > 20000;
507 if (can_go_parallel) {
509 std::vector<std::future<void>> worker_threads;
512 stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
513 i < num_worker_threads && start_entry < num_entries;
514 ++i, start_entry += stride) {
515 const auto end_entry = std::min(start_entry + stride, num_rows);
518 [&row_converter](
const size_t start,
const size_t end) {
519 for (
size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
520 row_converter(indexOfRow);
527 for (
auto& child : worker_threads) {
532 for (
size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
533 row_converter(entryIdx);
541 for (
size_t i = 0; i < chunkConverters.size(); i++) {
542 chunkConverters[i]->addDataBlocksToInsertData(insert_data);
546 for (
size_t i = 0; i < sourceDataConverters.size(); i++) {
547 if (sourceDataConverters[i]) {
548 sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
553 insert_data.
numRows = num_rows;
559 updelRoll.
getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
560 chunkMetadata->chunkStats.max.boolval = 1;
564 if (!deletedChunk->getBuffer()->hasEncoder()) {
565 deletedChunk->initEncoder();
567 deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(
true),
false);
569 if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
572 deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
574 deletedChunk->getBuffer()->setUpdated();
580 int64_t
const updated_val,
581 int64_t
const old_val,
582 NullSentinelSupplier s = NullSentinelSupplier()) {
606 double const updated_val,
607 double const old_val,
608 NullSentinelSupplier s = NullSentinelSupplier()) {
644 const int fragment_id,
645 const std::vector<uint64_t>& frag_offsets,
646 const std::vector<ScalarTargetValue>& rhs_values,
655 const auto nrow = frag_offsets.size();
656 const auto n_rhs_values = rhs_values.size();
660 CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
663 auto& fragment = *fragment_ptr;
664 auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->
columnId);
665 CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
673 chunk_meta_it->second->numBytes,
674 chunk_meta_it->second->numElements);
676 std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
679 std::vector<std::future<void>> threads;
681 const auto segsz = (nrow + ncore - 1) / ncore;
682 auto dbuf = chunk->getBuffer();
683 auto dbuf_addr = dbuf->getMemoryPtr();
686 for (
size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
702 lhs_type, &decimalOverflowValidator);
705 lhs_type, &dateDaysOverflowValidator);
713 stringDict = dictDesc->stringDict.get();
717 for (
size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
718 const auto roffs = frag_offsets[r];
720 auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
733 if (
const auto vp = boost::get<int64_t>(sv)) {
736 if (
nullptr == dictDesc) {
737 throw std::runtime_error(
738 "UPDATE does not support cast from string literal to string "
741 auto stringDict = dictDesc->stringDict.get();
748 if (
const auto vp = boost::get<int64_t>(sv)) {
751 throw std::runtime_error(
"UPDATE does not support cast to string.");
754 get_scalar<int64_t>(data_ptr, lhs_type, old_val);
757 if (lhs_type.is_date_in_days()) {
760 put_scalar<int64_t>(data_ptr, lhs_type, v, cd->
columnName, &rhs_type);
761 if (lhs_type.is_decimal()) {
762 nullAwareDecimalOverflowValidator.
validate<int64_t>(v);
764 get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
765 int64_t target_value = (v == inline_int_null_value<int64_t>() &&
766 lhs_type.get_notnull() ==
false)
770 lhs_type, update_stats_per_thread[c], target_value, old_val);
771 auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
772 auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
773 if (positive_v_and_negative_d || negative_v_and_positive_d) {
774 throw std::runtime_error(
782 if (lhs_type.is_date_in_days()) {
784 if (lhs_type.get_size() == 2) {
785 nullAwareDateOverflowValidator.
validate<int16_t>(v);
787 nullAwareDateOverflowValidator.
validate<int32_t>(v);
790 get_scalar<int64_t>(data_ptr, lhs_type, days);
792 int64_t target_value = (v == inline_int_null_value<int64_t>() &&
793 lhs_type.get_notnull() ==
false)
794 ? NullSentinelSupplier()(lhs_type, v)
797 lhs_type, update_stats_per_thread[c], target_value, old_val);
799 int64_t target_value;
806 lhs_type, update_stats_per_thread[c], target_value, old_val);
811 update_stats_per_thread[c],
818 }
else if (
const auto vp = boost::get<double>(sv)) {
821 throw std::runtime_error(
"UPDATE does not support cast to string.");
824 get_scalar<double>(data_ptr, lhs_type, old_val);
825 put_scalar<double>(data_ptr, lhs_type, v, cd->
columnName);
826 if (lhs_type.is_integer()) {
828 lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
829 }
else if (lhs_type.is_fp()) {
831 lhs_type, update_stats_per_thread[c],
double(v),
double(old_val));
833 UNREACHABLE() <<
"Unexpected combination of a non-floating or integer "
834 "LHS with a floating RHS.";
836 }
else if (
const auto vp = boost::get<float>(sv)) {
839 throw std::runtime_error(
"UPDATE does not support cast to string.");
842 get_scalar<float>(data_ptr, lhs_type, old_val);
843 put_scalar<float>(data_ptr, lhs_type, v, cd->
columnName);
844 if (lhs_type.is_integer()) {
846 lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
848 update_metadata(lhs_type, update_stats_per_thread[c],
double(v), old_val);
850 }
else if (
const auto vp = boost::get<NullableString>(sv)) {
851 const auto s = boost::get<std::string>(vp);
852 const auto sval = s ? *s : std::string(
"");
854 decltype(stringDict->getOrAdd(sval)) sidx;
857 sidx = stringDict->getOrAdd(sval);
860 get_scalar<int64_t>(data_ptr, lhs_type, old_val);
861 put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->
columnName);
863 lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
864 }
else if (sval.size() > 0) {
865 auto dval = std::atof(sval.data());
867 dval = sval ==
"t" || sval ==
"true" || sval ==
"T" || sval ==
"True";
868 }
else if (lhs_type.
is_time()) {
869 throw std::runtime_error(
870 "Date/Time/Timestamp update not supported through translated "
875 get_scalar<double>(data_ptr, lhs_type, old_val);
876 put_scalar<double>(data_ptr, lhs_type, dval, cd->
columnName);
878 lhs_type, update_stats_per_thread[c],
double(dval), old_val);
881 get_scalar<int64_t>(data_ptr, lhs_type, old_val);
882 put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->
columnName);
884 lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
888 update_stats_per_thread[c].new_values_stats.has_null =
true;
905 if (deleted_offsets.size() > 0) {
906 compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
912 for (
size_t c = 0; c < ncore; ++c) {
914 update_stats_per_thread[c].new_values_stats);
916 update_stats_per_thread[c].old_values_stats);
919 CHECK_GT(fragment.shadowNumTuples,
size_t(0));
924 update_stats.
chunk = chunk;
931 std::shared_ptr<Chunk_NS::Chunk> chunk,
936 auto buffer = chunk->getBuffer();
939 auto encoder = buffer->getEncoder();
940 auto update_stats = [&encoder](
auto min,
auto max,
auto has_null) {
941 static_assert(std::is_same<decltype(min), decltype(max)>::value,
942 "Type mismatch on min/max");
944 encoder->updateStats(decltype(min)(),
true);
949 encoder->updateStats(min,
false);
950 encoder->updateStats(max,
false);
957 }
else if (lhs_type.is_fp()) {
961 }
else if (lhs_type.is_decimal()) {
963 (int64_t)(new_values_stats.
max_double * pow(10, lhs_type.get_scale())),
965 }
else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
966 !(lhs_type.is_string() &&
kENCODING_DICT != lhs_type.get_compression())) {
972 auto chunk_metadata =
974 buffer->getEncoder()->getMetadata(chunk_metadata);
982 auto& fragmentInfo = *key.second;
983 fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
984 fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
985 fragmentInfo.shadowNumTuples = updel_roll.
getNumTuple(key);
986 fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
993 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
995 for (
int col_id = 1, ncol = 0; ncol < td->
nColumns; ++col_id) {
998 if (!cd->isVirtualCol) {
1008 chunk_meta_it->second->numBytes,
1009 chunk_meta_it->second->numElements);
1010 chunks.push_back(chunk);
1019 const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1020 const auto data_buffer = chunk->getBuffer();
1021 const auto data_addr = data_buffer->getMemoryPtr();
1022 const size_t nrows_in_chunk = data_buffer->size();
1024 const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1025 std::vector<std::vector<uint64_t>> deleted_offsets;
1026 deleted_offsets.resize(ncore);
1027 std::vector<std::future<void>> threads;
1028 for (
size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1030 const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1031 const auto ithread = rbegin / segsz;
1032 CHECK(ithread < deleted_offsets.size());
1033 deleted_offsets[ithread].reserve(segsz);
1034 for (
size_t r = rbegin; r < rend; ++r) {
1036 deleted_offsets[ithread].push_back(r);
1042 std::vector<uint64_t> all_deleted_offsets;
1043 for (
size_t i = 0; i < ncore; ++i) {
1044 all_deleted_offsets.insert(
1045 all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1047 return all_deleted_offsets;
1050 template <
typename T>
1058 const auto is_null = get_scalar<T>(data_addr, col_type, v);
1060 has_null = has_null || (can_be_null &&
is_null);
1068 const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1069 const size_t nrows_to_keep,
1071 auto cd = chunk->getColumnDesc();
1073 auto data_buffer = chunk->getBuffer();
1074 auto chunkMetadata =
1076 chunkMetadata->numElements = nrows_to_keep;
1077 chunkMetadata->numBytes = data_buffer->size();
1083 const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1084 const std::vector<uint64_t>& frag_offsets) {
1085 const auto cd = chunk->getColumnDesc();
1086 const auto& col_type = cd->columnType;
1087 auto data_buffer = chunk->getBuffer();
1088 auto data_addr = data_buffer->getMemoryPtr();
1090 col_type.is_fixlen_array() ? col_type.get_size() :
get_element_size(col_type);
1091 int64_t irow_of_blk_to_keep = 0;
1092 int64_t irow_of_blk_to_fill = 0;
1093 size_t nbytes_fix_data_to_keep = 0;
1094 auto nrows_to_vacuum = frag_offsets.size();
1096 for (
size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1097 auto is_last_one = irow == nrows_to_vacuum;
1098 auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1099 auto maddr_to_vacuum = data_addr;
1100 int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1101 if (nrows_to_keep > 0) {
1102 auto nbytes_to_keep = nrows_to_keep * element_size;
1103 if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1105 memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1106 maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1109 irow_of_blk_to_fill += nrows_to_keep;
1110 nbytes_fix_data_to_keep += nbytes_to_keep;
1112 irow_of_blk_to_keep = irow_to_vacuum + 1;
1114 return nbytes_fix_data_to_keep;
1121 const std::vector<uint64_t>& frag_offsets,
1123 size_t fragment_row_count) {
1124 if (is_varlen_array) {
1125 size_t first_non_deleted_row_index{0};
1126 for (
auto deleted_offset : frag_offsets) {
1127 if (first_non_deleted_row_index < deleted_offset) {
1130 first_non_deleted_row_index++;
1133 CHECK_LT(first_non_deleted_row_index, fragment_row_count);
1134 if (first_non_deleted_row_index == 0) {
1137 return index_array[0];
1141 if (index_array[first_non_deleted_row_index + 1] < 0) {
1142 size_t first_non_zero_offset{0};
1143 for (
size_t i = 0; i <= first_non_deleted_row_index; i++) {
1144 if (index_array[i] != 0) {
1145 first_non_zero_offset = index_array[i];
1149 CHECK_GT(first_non_zero_offset, static_cast<size_t>(0));
1151 first_non_zero_offset);
1163 const std::vector<uint64_t>& frag_offsets,
1165 size_t fragment_row_count) {
1166 std::set<size_t> null_array_indexes;
1168 size_t frag_offset_index{0};
1169 size_t vacuum_offset{0};
1170 for (
size_t i = 0; i < fragment_row_count; i++) {
1171 if (frag_offset_index < frag_offsets.size() &&
1172 i == frag_offsets[frag_offset_index]) {
1173 frag_offset_index++;
1175 }
else if (index_array[i + 1] < 0) {
1176 null_array_indexes.emplace(i - vacuum_offset);
1180 return null_array_indexes;
1186 auto offset = index_array[index];
1189 CHECK(is_varlen_array);
1197 const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1198 const std::vector<uint64_t>& frag_offsets) {
1199 auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1200 auto data_buffer = chunk->getBuffer();
1202 auto index_buffer = chunk->getIndexBuf();
1203 CHECK(index_buffer);
1204 auto data_addr = data_buffer->getMemoryPtr();
1205 auto indices_addr = index_buffer->getMemoryPtr();
1206 CHECK(indices_addr);
1208 int64_t irow_of_blk_to_keep = 0;
1209 int64_t irow_of_blk_to_fill = 0;
1210 size_t nbytes_fix_data_to_keep = 0;
1212 size_t null_padding =
1213 get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1214 size_t nbytes_var_data_to_keep = null_padding;
1216 chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1217 auto nrows_to_vacuum = frag_offsets.size();
1218 for (
size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1219 auto is_last_one = irow == nrows_to_vacuum;
1220 auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1221 auto maddr_to_vacuum = data_addr;
1222 int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1223 if (nrows_to_keep > 0) {
1224 auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1225 auto deleted_row_start_offset =
1227 auto kept_row_start_offset =
1229 auto nbytes_to_keep =
1230 (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1231 kept_row_start_offset;
1232 if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1233 if (nbytes_to_keep > 0) {
1236 memmove(data_addr + ibyte_var_data_to_keep,
1237 data_addr + kept_row_start_offset,
1241 const auto base_offset = kept_row_start_offset;
1242 for (int64_t i = 0; i < nrows_to_keep; ++i) {
1243 auto update_index = irow_of_blk_to_keep + i;
1245 index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1248 nbytes_var_data_to_keep += nbytes_to_keep;
1249 maddr_to_vacuum = indices_addr;
1251 constexpr
static auto index_element_size =
sizeof(
StringOffsetT);
1252 nbytes_to_keep = nrows_to_keep * index_element_size;
1253 if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1255 memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1256 maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1259 irow_of_blk_to_fill += nrows_to_keep;
1260 nbytes_fix_data_to_keep += nbytes_to_keep;
1262 irow_of_blk_to_keep = irow_to_vacuum + 1;
1266 index_array[0] = null_padding;
1267 auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1268 index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1269 if (!is_varlen_array) {
1270 CHECK(null_array_indexes.empty());
1272 for (
auto index : null_array_indexes) {
1273 index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1275 return nbytes_var_data_to_keep;
1280 const int fragment_id,
1281 const std::vector<uint64_t>& frag_offsets,
1285 auto& fragment = *fragment_ptr;
1287 const auto ncol = chunks.size();
1289 std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1292 std::vector<std::future<void>> threads;
1293 auto nrows_to_vacuum = frag_offsets.size();
1294 auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1295 auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1297 for (
size_t ci = 0; ci < chunks.size(); ++ci) {
1298 auto chunk = chunks[ci];
1299 const auto cd = chunk->getColumnDesc();
1300 const auto& col_type = cd->columnType;
1301 auto data_buffer = chunk->getBuffer();
1302 auto index_buffer = chunk->getIndexBuf();
1303 auto data_addr = data_buffer->getMemoryPtr();
1304 auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() :
nullptr;
1306 bool is_varlen = col_type.is_varlen_indeed();
1308 auto fixlen_vacuum =
1309 [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1310 size_t nbytes_fix_data_to_keep;
1311 if (nrows_to_keep == 0) {
1312 nbytes_fix_data_to_keep = 0;
1317 data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1318 data_buffer->setSize(nbytes_fix_data_to_keep);
1319 data_buffer->setUpdated();
1323 auto daddr = data_addr;
1324 auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1326 data_buffer->getEncoder()->resetChunkStats();
1327 for (
size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1328 if (col_type.is_fixlen_array()) {
1332 encoder->updateMetadata((int8_t*)daddr);
1333 }
else if (col_type.is_fp()) {
1336 update_stats_per_thread[ci].new_values_stats.has_null,
1337 update_stats_per_thread[ci].new_values_stats.min_double,
1338 update_stats_per_thread[ci].new_values_stats.max_double);
1342 update_stats_per_thread[ci].new_values_stats.has_null,
1343 update_stats_per_thread[ci].new_values_stats.min_int64t,
1344 update_stats_per_thread[ci].new_values_stats.max_int64t);
1349 auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1350 size_t nbytes_var_data_to_keep;
1351 if (nrows_to_keep == 0) {
1352 nbytes_var_data_to_keep = 0;
1357 data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1358 data_buffer->setSize(nbytes_var_data_to_keep);
1359 data_buffer->setUpdated();
1361 index_buffer->setSize(
sizeof(*index_array) *
1362 (nrows_to_keep ? 1 + nrows_to_keep : 0));
1363 index_buffer->setUpdated();
1380 updel_roll.
setNumTuple({td, &fragment}, nrows_to_keep);
1381 for (
size_t ci = 0; ci < chunks.size(); ++ci) {
1382 auto chunk = chunks[ci];
1383 auto cd = chunk->getColumnDesc();
1384 if (!cd->columnType.is_fixlen_array()) {
1388 if (cd->columnType.is_date_in_days()) {
1389 auto& stats = update_stats_per_thread[ci].new_values_stats;
1396 update_stats_per_thread[ci].new_values_stats,
1406 if (
nullptr == catalog) {
1422 dirty_chunks.clear();
1427 updateFragmenterAndCleanupChunks();
1434 CHECK(table_descriptor);
1435 auto table_id = table_descriptor->tableId;
1441 dirty_chunks.clear();
1444 updateFragmenterAndCleanupChunks();
1449 for (
auto& cm : chunk_metadata_map_per_fragment) {
1450 cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *
this);
1455 for (
const auto& [chunk_key, chunk] : dirty_chunks) {
1460 dirty_chunks.clear();
1464 if (
nullptr == catalog) {
1471 if (is_varlen_update) {
1473 auto table_epochs = catalog->
getTableEpochs(databaseId, logicalTableId);
1475 dirty_chunks.clear();
1481 for (
const auto& [chunk_key, chunk] : dirty_chunks) {
1483 chunk->setBuffer(
nullptr);
1490 int32_t fragment_id) {
1494 chunk->getColumnDesc()->tableId,
1495 chunk->getColumnDesc()->columnId,
1497 dirty_chunks[chunk_key] = chunk;
1505 if (chunk_metadata_map_per_fragment.count(key) == 0) {
1506 chunk_metadata_map_per_fragment[key] =
1509 if (num_tuples.count(key) == 0) {
1518 initializeUnsetMetadata(key.first, fragment_info);
1520 auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1521 CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1522 auto chunk_metadata_it = metadata_map_it->second.find(column_id);
1523 CHECK(chunk_metadata_it != metadata_map_it->second.end());
1524 return chunk_metadata_it->second;
1529 auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1530 CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1531 return metadata_map_it->second;
1536 auto it = num_tuples.find(key);
1537 CHECK(it != num_tuples.end());
1543 num_tuples[key] = num_tuple;
std::shared_ptr< Chunk_NS::Chunk > chunk
UpdateValuesStats new_values_stats
Data_Namespace::MemoryLevel memoryLevel
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
AbstractBuffer * getIndexBuf() const
std::vector< int > ChunkKey
void setNumTuple(const MetaDataKey &key, size_t num_tuple)
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
ChunkMetadataMap getChunkMetadataMapPhysicalCopy() const
int64_t fragment_rows_count
HOST DEVICE int get_size() const
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
size_t fixed_array_length_
bool is_varlen_array() const
Catalog_Namespace::Catalog * catalog_
class for a per-database catalog. also includes metadata for the current database and the current use...
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
std::vector< ArrayDatum > * arraysPtr
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Data_Namespace::DataMgr & getDataMgr() const
void checkpoint(const int db_id, const int tb_id)
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
~StringChunkConverter() override
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
HOST DEVICE int get_scale() const
const ColumnDescriptor * column_descriptor_
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
const Chunk_NS::Chunk * chunk_
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
std::vector< bool > is_default
size_t getPhysicalNumTuples() const
bool g_enable_auto_metadata_update
size_t get_element_size(const Type element_type)
void updateFragmenterAndCleanupChunks()
heavyai::shared_mutex fragmentInfoMutex_
heavyai::unique_lock< heavyai::shared_mutex > write_lock
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
~ArrayChunkConverter() override
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
std::shared_lock< T > shared_lock
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
size_t numRows
a vector of column ids for the row(s) being inserted
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const Chunk_NS::Chunk * chunk_
const ColumnDescriptor * getColumnDesc() const
int8_t * data_buffer_addr_
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
~DateChunkConverter() override
CONSTEXPR DEVICE bool is_null(const T &value)
const Catalog_Namespace::Catalog * catalog
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
void * checked_malloc(const size_t size)
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
bool is_timeinterval() const
const Chunk_NS::Chunk * chunk_
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
std::unique_lock< T > unique_lock
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getDatabaseId() const
~FixedLenArrayChunkConverter() override
int getLogicalTableId(const int physicalTableId) const
void initializeUnsetMetadata(const TableDescriptor *td, Fragmenter_Namespace::FragmentInfo &fragment_info)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
ColumnDataPtr column_data_
virtual ~ChunkToInsertDataConverter()
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void checkpoint(const int logicalTableId) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
HOST DEVICE EncodingType get_compression() const
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
AbstractBuffer * getBuffer() const
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
Data_Namespace::MemoryLevel persistenceLevel
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
ArrayOffsetT * index_buffer_addr_
boost::variant< std::string, void * > NullableString
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
HOST DEVICE int get_comp_param() const
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
size_t getNumTuple(const MetaDataKey &key) const
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
static bool unconditionalVacuum_
~ScalarChunkConverter() override
const StringOffsetT * index_buffer_addr_
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int64_t updated_rows_count
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
void setTableEpochsLogExceptions(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
void free(AbstractBuffer *buffer)
HOST DEVICE bool get_notnull() const
void set_minmax(T &min, T &max, T const val)
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
std::vector< int > columnIds
identifies the table into which the data is being inserted
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
UpdateValuesStats old_values_stats
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const int8_t * data_buffer_addr_
const Chunk_NS::Chunk * chunk_
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
ColumnDataPtr column_data_