20 #include <boost/lexical_cast.hpp>
27 #include <type_traits>
38 #define DROP_FRAGMENT_FACTOR \
39 0.97 // drop to 97% of max so we don't keep adding and dropping fragments
49 namespace Fragmenter_Namespace {
51 InsertOrderFragmenter::InsertOrderFragmenter(
52 const vector<int> chunkKeyPrefix,
53 vector<Chunk>& chunkVec,
56 const int physicalTableId,
58 const size_t maxFragmentRows,
59 const size_t maxChunkSize,
60 const size_t pageSize,
63 const bool uses_foreign_storage)
64 : chunkKeyPrefix_(chunkKeyPrefix)
67 , physicalTableId_(physicalTableId)
69 , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
73 , maxChunkSize_(maxChunkSize)
75 , fragmenterType_(
"insert_order")
76 , defaultInsertLevel_(defaultInsertLevel)
77 , uses_foreign_storage_(uses_foreign_storage)
78 , hasMaterializedRowId_(
false)
79 , mutex_access_inmem_states(new std::mutex) {
83 for (
auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
84 int columnId = colIt->getColumnDesc()->columnId;
85 columnMap_[columnId] = *colIt;
86 if (colIt->getColumnDesc()->columnName ==
"rowid") {
87 hasMaterializedRowId_ =
true;
88 rowIdColId_ = columnId;
91 conditionallyInstantiateFileMgrWithParams();
95 InsertOrderFragmenter::~InsertOrderFragmenter() {}
104 const int fragment_id,
105 const int num_devices) {
107 return (table_id + fragment_id) % num_devices;
109 return fragment_id % num_devices;
114 const size_t num_rows_left,
115 const size_t num_rows_inserted,
116 const std::unordered_map<int, size_t>& var_len_col_info,
117 const size_t max_chunk_size,
119 std::map<int, Chunk_NS::Chunk>& column_map,
120 const std::vector<size_t>& valid_row_indices) {
121 size_t num_rows_to_insert = min(rows_left_in_current_fragment, num_rows_left);
122 if (rows_left_in_current_fragment != 0) {
123 for (
const auto& var_len_col_info_it : var_len_col_info) {
124 CHECK_LE(var_len_col_info_it.second, max_chunk_size);
125 size_t bytes_left = max_chunk_size - var_len_col_info_it.second;
126 auto find_it = insert_chunks.
chunks.find(var_len_col_info_it.first);
127 if (find_it == insert_chunks.
chunks.end()) {
130 const auto& chunk = find_it->second;
131 auto column_type = chunk->getColumnDesc()->columnType;
132 const int8_t* index_buffer_ptr =
133 column_type.is_varlen_indeed() ? chunk->getIndexBuf()->getMemoryPtr() :
nullptr;
134 CHECK(column_type.is_varlen());
136 auto col_map_it = column_map.find(var_len_col_info_it.first);
138 std::min(num_rows_to_insert,
139 col_map_it->second.getNumElemsForBytesEncodedDataAtIndices(
140 index_buffer_ptr, valid_row_indices, bytes_left));
143 return num_rows_to_insert;
148 void InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams() {
152 if (!uses_foreign_storage_ &&
155 catalog_->getMetadataForTable(physicalTableId_,
false );
158 dataMgr_->getGlobalFileMgr()->setFileMgrParams(
159 chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
163 void InsertOrderFragmenter::getChunkMetadata() {
164 if (uses_foreign_storage_ ||
168 dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
173 int fragment_subkey_index = 3;
175 chunk_metadata.end(),
176 [&](
const auto& pair1,
const auto& pair2) {
177 return pair1.first[3] < pair2.first[3];
180 for (
auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
182 int cur_column_id = chunk_itr->first[2];
183 int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
185 if (fragmentInfoVec_.empty() ||
186 cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
187 auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
188 CHECK(new_fragment_info);
189 maxFragmentId_ = cur_fragment_id;
190 new_fragment_info->fragmentId = cur_fragment_id;
191 new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
192 numTuples_ += new_fragment_info->getPhysicalNumTuples();
193 for (
const auto level_size : dataMgr_->levelSizes_) {
194 new_fragment_info->deviceIds.push_back(
197 new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
198 new_fragment_info->physicalTableId = physicalTableId_;
199 new_fragment_info->shard = shard_;
200 fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
202 if (chunk_itr->second->numElements !=
203 fragmentInfoVec_.back()->getPhysicalNumTuples()) {
204 LOG(
FATAL) <<
"Inconsistency in num tuples within fragment for table " +
208 fragmentInfoVec_.back()->getPhysicalNumTuples()) +
213 CHECK(fragmentInfoVec_.back().get());
214 fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
218 size_t maxFixedColSize = 0;
220 for (
auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
221 auto size = colIt->second.getColumnDesc()->columnType.get_size();
223 varLenColInfo_.insert(std::make_pair(colIt->first, 0));
228 maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
232 maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
233 setLastFragmentVarLenColumnSizes();
236 void InsertOrderFragmenter::dropFragmentsToSize(
const size_t max_rows) {
238 dropFragmentsToSizeNoInsertLock(max_rows);
241 void InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock(
const size_t max_rows) {
246 if (fragmentInfoVec_.empty() ||
247 numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
251 if (numTuples_ > max_rows) {
252 size_t preNumTuples = numTuples_;
253 vector<int> dropFragIds;
255 while (numTuples_ > targetRows) {
256 CHECK_GT(fragmentInfoVec_.size(), size_t(0));
257 size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
258 dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
259 fragmentInfoVec_.pop_front();
260 CHECK_GE(numTuples_, numFragTuples);
261 numTuples_ -= numFragTuples;
263 deleteFragments(dropFragIds);
264 LOG(
INFO) <<
"dropFragmentsToSize, numTuples pre: " << preNumTuples
265 <<
" post: " << numTuples_ <<
" maxRows: " << max_rows;
269 void InsertOrderFragmenter::deleteFragments(
const vector<int>& dropFragIds) {
279 auto chunkKeyPrefix = chunkKeyPrefix_;
281 chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
286 const auto delete_lock =
291 for (
const auto fragId : dropFragIds) {
292 for (
const auto& col : columnMap_) {
293 int colId = col.first;
294 vector<int> fragPrefix = chunkKeyPrefix_;
295 fragPrefix.push_back(colId);
296 fragPrefix.push_back(fragId);
297 dataMgr_->deleteChunksWithPrefix(fragPrefix);
302 void InsertOrderFragmenter::updateColumnChunkMetadata(
304 const int fragment_id,
305 const std::shared_ptr<ChunkMetadata> metadata) {
309 CHECK(metadata.get());
310 auto fragment_info = getFragmentInfo(fragment_id);
311 CHECK(fragment_info);
312 fragment_info->setChunkMetadata(cd->
columnId, metadata);
315 void InsertOrderFragmenter::updateChunkStats(
317 std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map,
318 std::optional<Data_Namespace::MemoryLevel> memory_level) {
327 LOG(
WARNING) <<
"Skipping chunk stats update for logical table " << physicalTableId_;
331 const auto column_id = cd->
columnId;
332 const auto col_itr = columnMap_.find(column_id);
333 CHECK(col_itr != columnMap_.end());
335 for (
auto const& fragment : fragmentInfoVec_) {
336 auto stats_itr = stats_map.find(fragment->fragmentId);
337 if (stats_itr != stats_map.end()) {
338 auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
339 CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
340 ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
343 fragment->fragmentId};
345 &catalog_->getDataMgr(),
347 memory_level.value_or(defaultInsertLevel_),
349 chunk_meta_it->second->numBytes,
350 chunk_meta_it->second->numElements);
351 auto buf = chunk->getBuffer();
353 if (!buf->hasEncoder()) {
354 throw std::runtime_error(
"No encoder for chunk " +
show_chunk(chunk_key));
356 auto encoder = buf->getEncoder();
358 auto chunk_stats = stats_itr->second;
360 auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
361 encoder->getMetadata(old_chunk_metadata);
362 auto& old_chunk_stats = old_chunk_metadata->chunkStats;
364 const bool didResetStats = encoder->resetChunkStats(chunk_stats);
369 if (!didResetStats) {
370 VLOG(3) <<
"Skipping chunk stats reset for " <<
show_chunk(chunk_key);
375 VLOG(3) <<
"Nulls: " << (chunk_stats.has_nulls ?
"True" :
"False");
384 VLOG(2) <<
"Nulls: " << (chunk_stats.has_nulls ?
"True" :
"False");
387 auto new_metadata = std::make_shared<ChunkMetadata>();
390 encoder->getMetadata(new_metadata);
392 fragment->setChunkMetadata(column_id, new_metadata);
393 fragment->shadowChunkMetadataMap =
394 fragment->getChunkMetadataMapPhysicalCopy();
399 LOG(
WARNING) <<
"No chunk stats update found for fragment " << fragment->fragmentId
400 <<
", table " << physicalTableId_ <<
", "
401 <<
", column " << column_id;
406 FragmentInfo* InsertOrderFragmenter::getFragmentInfo(
const int fragment_id)
const {
407 auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
408 fragmentInfoVec_.end(),
409 [fragment_id](
const auto& fragment) ->
bool {
410 return fragment->fragmentId == fragment_id;
412 CHECK(fragment_it != fragmentInfoVec_.end());
413 return fragment_it->get();
416 bool InsertOrderFragmenter::isAddingNewColumns(
const InsertData& insert_data)
const {
417 bool all_columns_already_exist =
true, all_columns_are_new =
true;
418 for (
const auto column_id : insert_data.
columnIds) {
419 if (columnMap_.find(column_id) == columnMap_.end()) {
420 all_columns_already_exist =
false;
422 all_columns_are_new =
false;
426 bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
427 CHECK(either_all_exist_or_all_new);
428 return all_columns_are_new;
431 void InsertOrderFragmenter::insertChunks(
const InsertChunks& insert_chunk) {
435 insertChunksImpl(insert_chunk);
436 if (defaultInsertLevel_ ==
438 dataMgr_->checkpoint(
443 auto db_id = insert_chunk.
db_id;
444 auto table_epochs = catalog_->getTableEpochs(db_id, insert_chunk.
table_id);
448 catalog_->setTableEpochs(db_id, table_epochs);
453 void InsertOrderFragmenter::insertData(
InsertData& insert_data_struct) {
457 if (!isAddingNewColumns(insert_data_struct)) {
458 insertDataImpl(insert_data_struct);
460 addColumns(insert_data_struct);
462 if (defaultInsertLevel_ ==
464 dataMgr_->checkpoint(
469 auto table_epochs = catalog_->getTableEpochs(insert_data_struct.
databaseId,
474 catalog_->setTableEpochs(insert_data_struct.
databaseId, table_epochs);
479 void InsertOrderFragmenter::insertChunksNoCheckpoint(
const InsertChunks& insert_chunk) {
484 insertChunksImpl(insert_chunk);
487 void InsertOrderFragmenter::insertDataNoCheckpoint(
InsertData& insert_data_struct) {
492 if (!isAddingNewColumns(insert_data_struct)) {
493 insertDataImpl(insert_data_struct);
495 addColumns(insert_data_struct);
499 void InsertOrderFragmenter::addColumns(
const InsertData& insertDataStruct) {
502 size_t numRowsLeft = insertDataStruct.
numRows;
503 for (
const auto columnId : insertDataStruct.
columnIds) {
504 CHECK(columnMap_.end() == columnMap_.find(columnId));
505 const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
510 for (
auto const& fragmentInfo : fragmentInfoVec_) {
511 fragmentInfo->shadowChunkMetadataMap =
512 fragmentInfo->getChunkMetadataMapPhysicalCopy();
513 auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples();
514 size_t numRowsCanBeInserted;
515 for (
size_t i = 0; i < insertDataStruct.
columnIds.size(); i++) {
516 auto columnId = insertDataStruct.
columnIds[i];
517 auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
519 CHECK(columnMap_.find(columnId) != columnMap_.end());
521 ChunkKey chunkKey = chunkKeyPrefix_;
522 chunkKey.push_back(columnId);
523 chunkKey.push_back(fragmentInfo->fragmentId);
525 auto colMapIt = columnMap_.find(columnId);
526 auto& chunk = colMapIt->second;
527 if (chunk.isChunkOnDevice(
531 fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
532 dataMgr_->deleteChunksWithPrefix(chunkKey);
534 chunk.createChunkBuffer(
538 fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
543 auto size = colDesc->columnType.get_size();
545 std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
546 varLenColInfo_[columnId] = 0;
547 numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
548 dataCopy, numRowsToInsert, 0, maxChunkSize_,
true);
550 numRowsCanBeInserted = maxChunkSize_ / size;
554 if (numRowsCanBeInserted < numRowsToInsert) {
555 throw std::runtime_error(
"new column '" + colDesc->columnName +
556 "' wider than existing columns is not supported");
559 auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0,
true);
560 fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
564 std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
565 varLenColInfo_[columnId] = chunk.getBuffer()->size();
568 dataMgr_->deleteChunksWithPrefix(chunkKey);
572 numRowsLeft -= numRowsToInsert;
574 CHECK(0 == numRowsLeft);
575 }
catch (
const std::exception& e) {
576 for (
const auto columnId : insertDataStruct.
columnIds) {
577 columnMap_.erase(columnId);
582 for (
auto const& fragmentInfo : fragmentInfoVec_) {
583 fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
587 void InsertOrderFragmenter::dropColumns(
const std::vector<int>& columnIds) {
592 for (
auto const& fragmentInfo : fragmentInfoVec_) {
593 fragmentInfo->shadowChunkMetadataMap =
594 fragmentInfo->getChunkMetadataMapPhysicalCopy();
597 for (
const auto columnId : columnIds) {
598 auto cit = columnMap_.find(columnId);
599 if (columnMap_.end() != cit) {
600 columnMap_.erase(cit);
603 vector<int> fragPrefix = chunkKeyPrefix_;
604 fragPrefix.push_back(columnId);
605 dataMgr_->deleteChunksWithPrefix(fragPrefix);
607 for (
const auto& fragmentInfo : fragmentInfoVec_) {
608 auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
609 if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
610 fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
614 for (
const auto& fragmentInfo : fragmentInfoVec_) {
615 fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
619 bool InsertOrderFragmenter::hasDeletedRows(
const int delete_column_id) {
622 for (
auto const& fragment : fragmentInfoVec_) {
623 auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
624 CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
625 const auto& chunk_stats = chunk_meta_it->second->chunkStats;
626 if (chunk_stats.max.tinyintval == 1) {
633 void InsertOrderFragmenter::insertChunksIntoFragment(
635 const std::optional<int> delete_column_id,
637 const size_t num_rows_to_insert,
638 size_t& num_rows_inserted,
639 size_t& num_rows_left,
640 std::vector<size_t>& valid_row_indices,
641 const size_t start_fragment) {
644 auto insert_row_indices = valid_row_indices;
645 CHECK_GE(insert_row_indices.size(), num_rows_to_insert);
646 insert_row_indices.erase(insert_row_indices.begin() + num_rows_to_insert,
647 insert_row_indices.end());
648 CHECK_EQ(insert_row_indices.size(), num_rows_to_insert);
649 for (
auto& [column_id, chunk] : insert_chunks.
chunks) {
650 auto col_map_it = columnMap_.find(column_id);
651 CHECK(col_map_it != columnMap_.end());
653 col_map_it->second.appendEncodedDataAtIndices(*chunk, insert_row_indices);
654 auto var_len_col_info_it = varLenColInfo_.find(column_id);
655 if (var_len_col_info_it != varLenColInfo_.end()) {
656 var_len_col_info_it->second = col_map_it->second.getBuffer()->size();
657 CHECK_LE(var_len_col_info_it->second, maxChunkSize_);
660 if (hasMaterializedRowId_) {
661 size_t start_id = maxFragmentRows_ * current_fragment->
fragmentId +
663 std::vector<int64_t> row_id_data(num_rows_to_insert);
664 for (
size_t i = 0; i < num_rows_to_insert; ++i) {
665 row_id_data[i] = i + start_id;
668 row_id_block.
numbersPtr =
reinterpret_cast<int8_t*
>(row_id_data.data());
669 auto col_map_it = columnMap_.find(rowIdColId_);
670 CHECK(col_map_it != columnMap_.end());
672 row_id_block, num_rows_to_insert, num_rows_inserted);
675 if (delete_column_id) {
676 std::vector<int8_t> delete_data(num_rows_to_insert,
false);
678 delete_block.
numbersPtr =
reinterpret_cast<int8_t*
>(delete_data.data());
679 auto col_map_it = columnMap_.find(*delete_column_id);
680 CHECK(col_map_it != columnMap_.end());
682 col_map_it->second.appendData(
683 delete_block, num_rows_to_insert, num_rows_inserted);
687 fragmentInfoVec_.back()->getPhysicalNumTuples() + num_rows_to_insert;
688 num_rows_left -= num_rows_to_insert;
689 num_rows_inserted += num_rows_to_insert;
690 for (
auto part_it = fragmentInfoVec_.begin() + start_fragment;
691 part_it != fragmentInfoVec_.end();
693 auto fragment_ptr = part_it->get();
694 fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
695 fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
699 valid_row_indices.erase(valid_row_indices.begin(),
700 valid_row_indices.begin() + num_rows_to_insert);
703 void InsertOrderFragmenter::insertChunksImpl(
const InsertChunks& insert_chunks) {
704 std::optional<int> delete_column_id{std::nullopt};
705 for (
const auto& cit : columnMap_) {
706 if (cit.second.getColumnDesc()->isDeletedCol) {
707 delete_column_id = cit.second.getColumnDesc()->columnId;
713 std::optional<size_t> num_rows{std::nullopt};
714 for (
const auto& [column_id, chunk] : insert_chunks.
chunks) {
715 auto buffer = chunk->getBuffer();
717 CHECK(buffer->hasEncoder());
718 if (!num_rows.has_value()) {
719 num_rows = buffer->getEncoder()->getNumElems();
721 CHECK_EQ(num_rows.value(), buffer->getEncoder()->getNumElems());
726 size_t num_rows_left = valid_row_indices.size();
727 size_t num_rows_inserted = 0;
729 if (num_rows_left == 0) {
737 if (fragmentInfoVec_.empty()) {
738 current_fragment = createNewFragment(defaultInsertLevel_);
740 current_fragment = fragmentInfoVec_.back().get();
742 CHECK(current_fragment);
744 size_t start_fragment = fragmentInfoVec_.size() - 1;
746 while (num_rows_left > 0) {
748 CHECK_LE(current_fragment->shadowNumTuples, maxFragmentRows_);
749 size_t rows_left_in_current_fragment =
750 maxFragmentRows_ - current_fragment->shadowNumTuples;
760 if (rows_left_in_current_fragment == 0 || num_rows_to_insert == 0) {
761 current_fragment = createNewFragment(defaultInsertLevel_);
762 if (num_rows_inserted == 0) {
765 rows_left_in_current_fragment = maxFragmentRows_;
766 for (
auto& varLenColInfoIt : varLenColInfo_) {
767 varLenColInfoIt.second = 0;
779 CHECK_GT(num_rows_to_insert,
size_t(0));
782 insertChunksIntoFragment(insert_chunks,
791 numTuples_ += *num_rows;
792 dropFragmentsToSizeNoInsertLock(maxRows_);
795 void InsertOrderFragmenter::insertDataImpl(
InsertData& insert_data) {
797 std::unique_ptr<int8_t[]> data_for_deleted_column;
798 for (
const auto& cit : columnMap_) {
799 if (cit.second.getColumnDesc()->isDeletedCol) {
800 data_for_deleted_column.reset(
new int8_t[insert_data.
numRows]);
801 memset(data_for_deleted_column.get(), 0, insert_data.
numRows);
803 insert_data.
columnIds.push_back(cit.second.getColumnDesc()->columnId);
809 std::unordered_map<int, int> inverseInsertDataColIdMap;
810 for (
size_t insertId = 0; insertId < insert_data.
columnIds.size(); ++insertId) {
811 inverseInsertDataColIdMap.insert(
812 std::make_pair(insert_data.
columnIds[insertId], insertId));
815 size_t numRowsLeft = insert_data.
numRows;
816 size_t numRowsInserted = 0;
817 vector<DataBlockPtr> dataCopy =
820 if (numRowsLeft <= 0) {
828 if (fragmentInfoVec_.empty()) {
829 currentFragment = createNewFragment(defaultInsertLevel_);
831 currentFragment = fragmentInfoVec_.back().get();
833 CHECK(currentFragment);
835 size_t startFragment = fragmentInfoVec_.size() - 1;
837 while (numRowsLeft > 0) {
839 CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
840 size_t rowsLeftInCurrentFragment =
841 maxFragmentRows_ - currentFragment->shadowNumTuples;
842 size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
843 if (rowsLeftInCurrentFragment != 0) {
844 for (
auto& varLenColInfoIt : varLenColInfo_) {
845 CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
846 size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
847 auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
848 if (insertIdIt != inverseInsertDataColIdMap.end()) {
849 auto colMapIt = columnMap_.find(varLenColInfoIt.first);
850 numRowsToInsert = std::min(numRowsToInsert,
851 colMapIt->second.getNumElemsForBytesInsertData(
852 dataCopy[insertIdIt->second],
861 if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
862 currentFragment = createNewFragment(defaultInsertLevel_);
863 if (numRowsInserted == 0) {
866 rowsLeftInCurrentFragment = maxFragmentRows_;
867 for (
auto& varLenColInfoIt : varLenColInfo_) {
868 varLenColInfoIt.second = 0;
870 numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
871 for (
auto& varLenColInfoIt : varLenColInfo_) {
872 CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
873 size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
874 auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
875 if (insertIdIt != inverseInsertDataColIdMap.end()) {
876 auto colMapIt = columnMap_.find(varLenColInfoIt.first);
877 numRowsToInsert = std::min(numRowsToInsert,
878 colMapIt->second.getNumElemsForBytesInsertData(
879 dataCopy[insertIdIt->second],
888 CHECK_GT(numRowsToInsert,
size_t(0));
894 for (
size_t i = 0; i < insert_data.
columnIds.size(); ++i) {
896 auto colMapIt = columnMap_.find(columnId);
897 CHECK(colMapIt != columnMap_.end());
898 currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
899 dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.
is_default[i]);
900 auto varLenColInfoIt = varLenColInfo_.find(columnId);
901 if (varLenColInfoIt != varLenColInfo_.end()) {
902 varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
905 if (hasMaterializedRowId_) {
906 size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
907 currentFragment->shadowNumTuples;
908 auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
909 for (
size_t i = 0; i < numRowsToInsert; ++i) {
910 row_id_data[i] = i + startId;
913 rowIdBlock.
numbersPtr =
reinterpret_cast<int8_t*
>(row_id_data.get());
914 auto colMapIt = columnMap_.find(rowIdColId_);
915 currentFragment->shadowChunkMetadataMap[rowIdColId_] =
916 colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
919 currentFragment->shadowNumTuples =
920 fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
921 numRowsLeft -= numRowsToInsert;
922 numRowsInserted += numRowsToInsert;
923 for (
auto partIt = fragmentInfoVec_.begin() + startFragment;
924 partIt != fragmentInfoVec_.end();
926 auto fragment_ptr = partIt->get();
927 fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
928 fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
932 numTuples_ += insert_data.
numRows;
933 dropFragmentsToSizeNoInsertLock(maxRows_);
941 auto newFragmentInfo = std::make_unique<FragmentInfo>();
943 newFragmentInfo->shadowNumTuples = 0;
944 newFragmentInfo->setPhysicalNumTuples(0);
945 for (
const auto levelSize : dataMgr_->levelSizes_) {
947 physicalTableId_, newFragmentInfo->fragmentId, levelSize));
949 newFragmentInfo->physicalTableId = physicalTableId_;
950 newFragmentInfo->shard = shard_;
952 for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
953 colMapIt != columnMap_.end();
955 auto& chunk = colMapIt->second;
962 tracked_in_memory_chunks_.emplace_back(std::make_unique<Chunk_NS::Chunk>(chunk));
964 ChunkKey chunkKey = chunkKeyPrefix_;
965 chunkKey.push_back(chunk.getColumnDesc()->columnId);
966 chunkKey.push_back(maxFragmentId_);
967 chunk.createChunkBuffer(dataMgr_,
970 newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
976 fragmentInfoVec_.push_back(std::move(newFragmentInfo));
977 return fragmentInfoVec_.back().get();
980 size_t InsertOrderFragmenter::getNumFragments() {
982 return fragmentInfoVec_.size();
985 TableInfo InsertOrderFragmenter::getFragmentsForQuery() {
990 bool fragmentsExist =
false;
991 if (fragmentInfoVec_.empty()) {
994 int maxFragmentId = 0;
999 emptyFragmentInfo.
deviceIds.resize(dataMgr_->levelSizes_.size());
1001 emptyFragmentInfo.
shard = shard_;
1002 queryInfo.
fragments.push_back(emptyFragmentInfo);
1004 fragmentsExist =
true;
1006 fragmentInfoVec_.begin(),
1007 fragmentInfoVec_.end(),
1008 [&queryInfo](
const auto& fragment_owned_ptr) {
1009 queryInfo.
fragments.emplace_back(*fragment_owned_ptr);
1014 auto partIt = queryInfo.
fragments.begin();
1015 if (fragmentsExist) {
1016 while (partIt != queryInfo.
fragments.end()) {
1017 if (partIt->getPhysicalNumTuples() == 0) {
1022 partIt = queryInfo.
fragments.erase(partIt);
1025 partIt->getPhysicalNumTuples());
1036 void InsertOrderFragmenter::resetSizesFromFragments() {
1039 for (
const auto& fragment_info : fragmentInfoVec_) {
1040 numTuples_ += fragment_info->getPhysicalNumTuples();
1042 setLastFragmentVarLenColumnSizes();
1045 void InsertOrderFragmenter::setLastFragmentVarLenColumnSizes() {
1046 if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
1049 int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
1052 fragmentInfoVec_.back()->deviceIds[
static_cast<int>(defaultInsertLevel_)];
1053 for (
auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
1054 ChunkKey insertKey = chunkKeyPrefix_;
1055 insertKey.push_back(colIt->first);
1056 insertKey.push_back(lastFragmentId);
1057 colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
1058 auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
1059 if (varLenColInfoIt != varLenColInfo_.end()) {
1060 varLenColInfoIt->second = colIt->second.getBuffer()->size();
std::lock_guard< T > lock_guard
int32_t maxRollbackEpochs
bool g_use_table_device_offset
std::vector< int > ChunkKey
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
class for a per-database catalog. also includes metadata for the current database and the current use...
heavyai::shared_lock< heavyai::shared_mutex > read_lock
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
std::vector< int > deviceIds
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
DEVICE void sort(ARGS &&...args)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
std::vector< bool > is_default
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::vector< FragmentInfo > fragments
std::vector< int > chunkKeyPrefix
std::string show_chunk(const ChunkKey &key)
int tableId
identifies the database into which the data is being inserted
std::shared_lock< T > shared_lock
size_t getPhysicalNumTuples() const
size_t numRows
a vector of column ids for the row(s) being inserted
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
std::unique_lock< T > unique_lock
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
std::vector< DataBlockPtr > data
the number of rows being inserted
std::map< int, std::shared_ptr< Chunk_NS::Chunk > > chunks
std::vector< size_t > valid_row_indices
bool g_enable_watchdog false
void setPhysicalNumTuples(const size_t physNumTuples)
#define DROP_FRAGMENT_FACTOR
void setPhysicalNumTuples(const size_t physNumTuples)
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
The data to be inserted using the fragment manager.
bool is_dict_encoded_string() const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
int32_t max_rollback_epochs
std::vector< int > columnIds
identifies the table into which the data is being inserted
ChunkMetadataMap shadowChunkMetadataMap
size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment, const size_t num_rows_left, const size_t num_rows_inserted, const std::unordered_map< int, size_t > &var_len_col_info, const size_t max_chunk_size, const InsertChunks &insert_chunks, std::map< int, Chunk_NS::Chunk > &column_map, const std::vector< size_t > &valid_row_indices)
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)