OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
size_t getNumFragments () override
 returns the number of fragments in a table More...
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insert_data_struct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertChunks (const InsertChunks &insert_chunk) override
 Insert chunks into minimal number of fragments. More...
 
void insertDataNoCheckpoint (InsertData &insert_data_struct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void insertChunksNoCheckpoint (const InsertChunks &insert_chunk) override
 Insert chunks into minimal number of fragments; no locks or checkpoints taken. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateColumnChunkMetadata (const ColumnDescriptor *cd, const int fragment_id, const std::shared_ptr< ChunkMetadata > metadata) override
 Updates the metadata for a column chunk. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map, std::optional< Data_Namespace::MemoryLevel > memory_level) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
std::optional< ChunkUpdateStatsupdateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
void resetSizesFromFragments () override
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void conditionallyInstantiateFileMgrWithParams ()
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insert_data)
 
void insertChunksImpl (const InsertChunks &insert_chunk)
 
void addColumns (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< std::unique_ptr
< FragmentInfo > > 
fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
heavyai::shared_mutex fragmentInfoMutex_
 
heavyai::shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Private Member Functions

bool isAddingNewColumns (const InsertData &insert_data) const
 
void dropFragmentsToSizeNoInsertLock (const size_t max_rows)
 
void setLastFragmentVarLenColumnSizes ()
 
void insertChunksIntoFragment (const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, std::vector< size_t > &valid_row_indices, const size_t start_fragment)
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 54 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 95 of file InsertOrderFragmenter.cpp.

95 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::addColumns ( const InsertData insertDataStruct)
protected

Definition at line 499 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

499  {
500  // synchronize concurrent accesses to fragmentInfoVec_
502  size_t numRowsLeft = insertDataStruct.numRows;
503  for (const auto columnId : insertDataStruct.columnIds) {
504  CHECK(columnMap_.end() == columnMap_.find(columnId));
505  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
506  CHECK(columnDesc);
507  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
508  }
509  try {
510  for (auto const& fragmentInfo : fragmentInfoVec_) {
511  fragmentInfo->shadowChunkMetadataMap =
512  fragmentInfo->getChunkMetadataMapPhysicalCopy();
513  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
514  size_t numRowsCanBeInserted;
515  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
516  auto columnId = insertDataStruct.columnIds[i];
517  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
518  CHECK(colDesc);
519  CHECK(columnMap_.find(columnId) != columnMap_.end());
520 
521  ChunkKey chunkKey = chunkKeyPrefix_;
522  chunkKey.push_back(columnId);
523  chunkKey.push_back(fragmentInfo->fragmentId);
524 
525  auto colMapIt = columnMap_.find(columnId);
526  auto& chunk = colMapIt->second;
527  if (chunk.isChunkOnDevice(
528  dataMgr_,
529  chunkKey,
531  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
532  dataMgr_->deleteChunksWithPrefix(chunkKey);
533  }
534  chunk.createChunkBuffer(
535  dataMgr_,
536  chunkKey,
538  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
539  chunk.initEncoder();
540 
541  try {
542  DataBlockPtr dataCopy = insertDataStruct.data[i];
543  auto size = colDesc->columnType.get_size();
544  if (0 > size) {
545  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
546  varLenColInfo_[columnId] = 0;
547  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
548  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
549  } else {
550  numRowsCanBeInserted = maxChunkSize_ / size;
551  }
552 
553  // FIXME: abort a case in which new column is wider than existing columns
554  if (numRowsCanBeInserted < numRowsToInsert) {
555  throw std::runtime_error("new column '" + colDesc->columnName +
556  "' wider than existing columns is not supported");
557  }
558 
559  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
560  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
561 
562  // update total size of var-len column in (actually the last) fragment
563  if (0 > size) {
564  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
565  varLenColInfo_[columnId] = chunk.getBuffer()->size();
566  }
567  } catch (...) {
568  dataMgr_->deleteChunksWithPrefix(chunkKey);
569  throw;
570  }
571  }
572  numRowsLeft -= numRowsToInsert;
573  }
574  CHECK(0 == numRowsLeft);
575  } catch (const std::exception& e) {
576  for (const auto columnId : insertDataStruct.columnIds) {
577  columnMap_.erase(columnId);
578  }
579  throw e;
580  }
581 
582  for (auto const& fragmentInfo : fragmentInfoVec_) {
583  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
584  }
585 }
std::vector< int > ChunkKey
Definition: types.h:36
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:495
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1274 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), getChunksForAllColumns(), getFragmentInfo(), Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), UpdelRoll::setNumTuple(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1279  {
1280  auto fragment_ptr = getFragmentInfo(fragment_id);
1281  auto& fragment = *fragment_ptr;
1282  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1283  const auto ncol = chunks.size();
1284 
1285  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1286 
1287  // parallel delete columns
1288  std::vector<std::future<void>> threads;
1289  auto nrows_to_vacuum = frag_offsets.size();
1290  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1291  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1292 
1293  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1294  auto chunk = chunks[ci];
1295  const auto cd = chunk->getColumnDesc();
1296  const auto& col_type = cd->columnType;
1297  auto data_buffer = chunk->getBuffer();
1298  auto index_buffer = chunk->getIndexBuf();
1299  auto data_addr = data_buffer->getMemoryPtr();
1300  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1301  auto index_array = (StringOffsetT*)indices_addr;
1302  bool is_varlen = col_type.is_varlen_indeed();
1303 
1304  auto fixlen_vacuum =
1305  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1306  size_t nbytes_fix_data_to_keep;
1307  if (nrows_to_keep == 0) {
1308  nbytes_fix_data_to_keep = 0;
1309  } else {
1310  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1311  }
1312 
1313  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1314  data_buffer->setSize(nbytes_fix_data_to_keep);
1315  data_buffer->setUpdated();
1316 
1317  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1318 
1319  auto daddr = data_addr;
1320  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1321  : get_element_size(col_type);
1322  data_buffer->getEncoder()->resetChunkStats();
1323  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1324  if (col_type.is_fixlen_array()) {
1325  auto encoder =
1326  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1327  CHECK(encoder);
1328  encoder->updateMetadata((int8_t*)daddr);
1329  } else if (col_type.is_fp()) {
1330  set_chunk_stats(col_type,
1331  daddr,
1332  update_stats_per_thread[ci].new_values_stats.has_null,
1333  update_stats_per_thread[ci].new_values_stats.min_double,
1334  update_stats_per_thread[ci].new_values_stats.max_double);
1335  } else {
1336  set_chunk_stats(col_type,
1337  daddr,
1338  update_stats_per_thread[ci].new_values_stats.has_null,
1339  update_stats_per_thread[ci].new_values_stats.min_int64t,
1340  update_stats_per_thread[ci].new_values_stats.max_int64t);
1341  }
1342  }
1343  };
1344 
1345  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1346  size_t nbytes_var_data_to_keep;
1347  if (nrows_to_keep == 0) {
1348  nbytes_var_data_to_keep = 0;
1349  } else {
1350  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1351  }
1352 
1353  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1354  data_buffer->setSize(nbytes_var_data_to_keep);
1355  data_buffer->setUpdated();
1356 
1357  index_buffer->setSize(sizeof(*index_array) *
1358  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1359  index_buffer->setUpdated();
1360 
1361  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1362  };
1363 
1364  if (is_varlen) {
1365  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1366  } else {
1367  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1368  }
1369  if (threads.size() >= (size_t)cpu_threads()) {
1370  wait_cleanup_threads(threads);
1371  }
1372  }
1373 
1374  wait_cleanup_threads(threads);
1375 
1376  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1377  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1378  auto chunk = chunks[ci];
1379  auto cd = chunk->getColumnDesc();
1380  if (!cd->columnType.is_fixlen_array()) {
1381  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1382  // stored in seconds. Do the metadata conversion here before updating the chunk
1383  // stats.
1384  if (cd->columnType.is_date_in_days()) {
1385  auto& stats = update_stats_per_thread[ci].new_values_stats;
1386  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1387  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1388  }
1390  fragment,
1391  chunk,
1392  update_stats_per_thread[ci].new_values_stats,
1393  cd->columnType,
1394  updel_roll);
1395  }
1396  }
1397 }
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:1113
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
#define CHECK(condition)
Definition: Logger.h:222
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
int cpu_threads()
Definition: thread_count.h:24
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams ( )
protected

Definition at line 148 of file InsertOrderFragmenter.cpp.

References catalog_(), Data_Namespace::DISK_LEVEL, File_Namespace::FileMgrParams::max_rollback_epochs, and TableDescriptor::maxRollbackEpochs.

148  {
149  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
150  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
151  // storage per table
152  if (!uses_foreign_storage_ &&
154  const TableDescriptor* td =
155  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
156  File_Namespace::FileMgrParams fileMgrParams;
157  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
159  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
160  }
161 }
int32_t maxRollbackEpochs
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:609
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 936 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), and Fragmenter_Namespace::FragmentInfo::fragmentId.

937  {
938  // also sets the new fragment as the insertBuffer for each column
939 
940  maxFragmentId_++;
941  auto newFragmentInfo = std::make_unique<FragmentInfo>();
942  newFragmentInfo->fragmentId = maxFragmentId_;
943  newFragmentInfo->shadowNumTuples = 0;
944  newFragmentInfo->setPhysicalNumTuples(0);
945  for (const auto levelSize : dataMgr_->levelSizes_) {
946  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
947  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
948  }
949  newFragmentInfo->physicalTableId = physicalTableId_;
950  newFragmentInfo->shard = shard_;
951 
952  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
953  colMapIt != columnMap_.end();
954  ++colMapIt) {
955  ChunkKey chunkKey = chunkKeyPrefix_;
956  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
957  chunkKey.push_back(maxFragmentId_);
958  colMapIt->second.createChunkBuffer(
959  dataMgr_,
960  chunkKey,
961  memoryLevel,
962  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
963  pageSize_);
964  colMapIt->second.initEncoder();
965  }
966 
968  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
969  return fragmentInfoVec_.back().get();
970 }
std::lock_guard< T > lock_guard
std::vector< int > ChunkKey
Definition: types.h:36
std::vector< int > levelSizes_
Definition: DataMgr.h:229
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 269 of file InsertOrderFragmenter.cpp.

References catalog_(), and lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

269  {
270  // Fix a verified loophole on sharded logical table which is locked using logical
271  // tableId while it's its physical tables that can come here when fragments overflow
272  // during COPY. Locks on a logical table and its physical tables never intersect, which
273  // means potential races. It'll be an overkill to resolve a logical table to physical
274  // tables in DBHandler, ParseNode or other higher layers where the logical table is
275  // locked with Table Read/Write locks; it's easier to lock the logical table of its
276  // physical tables. A downside of this approach may be loss of parallel execution of
277  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
278  // operation, the loss seems not a big deal.
279  auto chunkKeyPrefix = chunkKeyPrefix_;
280  if (shard_ >= 0) {
281  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
282  }
283 
284  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
285  // SELECT and COPY may enter a deadlock
286  const auto delete_lock =
288 
290 
291  for (const auto fragId : dropFragIds) {
292  for (const auto& col : columnMap_) {
293  int colId = col.first;
294  vector<int> fragPrefix = chunkKeyPrefix_;
295  fragPrefix.push_back(colId);
296  fragPrefix.push_back(fragId);
297  dataMgr_->deleteChunksWithPrefix(fragPrefix);
298  }
299  }
300 }
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
std::unique_lock< T > unique_lock
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4740
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:495
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 587 of file InsertOrderFragmenter.cpp.

587  {
588  // prevent concurrent insert rows and drop column
590  // synchronize concurrent accesses to fragmentInfoVec_
592  for (auto const& fragmentInfo : fragmentInfoVec_) {
593  fragmentInfo->shadowChunkMetadataMap =
594  fragmentInfo->getChunkMetadataMapPhysicalCopy();
595  }
596 
597  for (const auto columnId : columnIds) {
598  auto cit = columnMap_.find(columnId);
599  if (columnMap_.end() != cit) {
600  columnMap_.erase(cit);
601  }
602 
603  vector<int> fragPrefix = chunkKeyPrefix_;
604  fragPrefix.push_back(columnId);
605  dataMgr_->deleteChunksWithPrefix(fragPrefix);
606 
607  for (const auto& fragmentInfo : fragmentInfoVec_) {
608  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
609  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
610  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
611  }
612  }
613  }
614  for (const auto& fragmentInfo : fragmentInfoVec_) {
615  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
616  }
617 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unique_lock< T > unique_lock
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:495
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 236 of file InsertOrderFragmenter.cpp.

236  {
239 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
std::unique_lock< T > unique_lock
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock ( const size_t  max_rows)
private

Definition at line 241 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

241  {
242  // not safe to call from outside insertData
243  // b/c depends on insertLock around numTuples_
244 
245  // don't ever drop the only fragment!
246  if (fragmentInfoVec_.empty() ||
247  numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
248  return;
249  }
250 
251  if (numTuples_ > max_rows) {
252  size_t preNumTuples = numTuples_;
253  vector<int> dropFragIds;
254  size_t targetRows = max_rows * DROP_FRAGMENT_FACTOR;
255  while (numTuples_ > targetRows) {
256  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
257  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
258  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
259  fragmentInfoVec_.pop_front();
260  CHECK_GE(numTuples_, numFragTuples);
261  numTuples_ -= numFragTuples;
262  }
263  deleteFragments(dropFragIds);
264  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
265  << " post: " << numTuples_ << " maxRows: " << max_rows;
266  }
267 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:216
#define CHECK_GE(x, y)
Definition: Logger.h:235
#define CHECK_GT(x, y)
Definition: Logger.h:234
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 120 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 163 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GE, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, gpu_enabled::sort(), and to_string().

163  {
164  if (uses_foreign_storage_ ||
166  // memory-resident tables won't have anything on disk
167  ChunkMetadataVector chunk_metadata;
169 
170  // data comes like this - database_id, table_id, column_id, fragment_id
171  // but lets sort by database_id, table_id, fragment_id, column_id
172 
173  int fragment_subkey_index = 3;
174  std::sort(chunk_metadata.begin(),
175  chunk_metadata.end(),
176  [&](const auto& pair1, const auto& pair2) {
177  return pair1.first[3] < pair2.first[3];
178  });
179 
180  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
181  ++chunk_itr) {
182  int cur_column_id = chunk_itr->first[2];
183  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
184 
185  if (fragmentInfoVec_.empty() ||
186  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
187  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
188  CHECK(new_fragment_info);
189  maxFragmentId_ = cur_fragment_id;
190  new_fragment_info->fragmentId = cur_fragment_id;
191  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
192  numTuples_ += new_fragment_info->getPhysicalNumTuples();
193  for (const auto level_size : dataMgr_->levelSizes_) {
194  new_fragment_info->deviceIds.push_back(
195  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
196  }
197  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
198  new_fragment_info->physicalTableId = physicalTableId_;
199  new_fragment_info->shard = shard_;
200  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
201  } else {
202  if (chunk_itr->second->numElements !=
203  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
204  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
205  std::to_string(physicalTableId_) + ", Column " +
206  std::to_string(cur_column_id) + ". Fragment Tuples: " +
208  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
209  ", Chunk Tuples: " +
210  std::to_string(chunk_itr->second->numElements);
211  }
212  }
213  CHECK(fragmentInfoVec_.back().get());
214  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
215  }
216  }
217 
218  size_t maxFixedColSize = 0;
219 
220  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
221  auto size = colIt->second.getColumnDesc()->columnType.get_size();
222  if (size == -1) { // variable length
223  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
224  size = 8; // b/c we use this for string and array indices - gross to have magic
225  // number here
226  }
227  CHECK_GE(size, 0);
228  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
229  }
230 
231  // this is maximum number of rows assuming everything is fixed length
232  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
234 }
std::vector< int > levelSizes_
Definition: DataMgr.h:229
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:216
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:235
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:469
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 985 of file UpdelStorage.cpp.

References catalog_, CHECK, Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

988  {
989  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
990  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
991  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
992  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
993  ++ncol;
994  if (!cd->isVirtualCol) {
995  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
996  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
997  ChunkKey chunk_key{
998  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
999  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1000  &catalog_->getDataMgr(),
1001  chunk_key,
1002  memory_level,
1003  0,
1004  chunk_meta_it->second->numBytes,
1005  chunk_meta_it->second->numElements);
1006  chunks.push_back(chunk);
1007  }
1008  }
1009  }
1010  return chunks;
1011 }
std::vector< int > ChunkKey
Definition: types.h:36
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:222
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 119 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

119 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 124 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 406 of file InsertOrderFragmenter.cpp.

References CHECK.

Referenced by compactRows(), updateColumn(), and updateColumns().

406  {
407  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
408  fragmentInfoVec_.end(),
409  [fragment_id](const auto& fragment) -> bool {
410  return fragment->fragmentId == fragment_id;
411  });
412  CHECK(fragment_it != fragmentInfoVec_.end());
413  return fragment_it->get();
414 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the caller graph for this function:

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected
TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 977 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

977  {
979  TableInfo queryInfo;
980  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
981  // right now we don't test predicate, so just return (copy of) all fragments
982  bool fragmentsExist = false;
983  if (fragmentInfoVec_.empty()) {
984  // If we have no fragments add a dummy empty fragment to make the executor
985  // not have separate logic for 0-row tables
986  int maxFragmentId = 0;
987  FragmentInfo emptyFragmentInfo;
988  emptyFragmentInfo.fragmentId = maxFragmentId;
989  emptyFragmentInfo.shadowNumTuples = 0;
990  emptyFragmentInfo.setPhysicalNumTuples(0);
991  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
992  emptyFragmentInfo.physicalTableId = physicalTableId_;
993  emptyFragmentInfo.shard = shard_;
994  queryInfo.fragments.push_back(emptyFragmentInfo);
995  } else {
996  fragmentsExist = true;
997  std::for_each(
998  fragmentInfoVec_.begin(),
999  fragmentInfoVec_.end(),
1000  [&queryInfo](const auto& fragment_owned_ptr) {
1001  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
1002  });
1003  }
1004  readLock.unlock();
1005  queryInfo.setPhysicalNumTuples(0);
1006  auto partIt = queryInfo.fragments.begin();
1007  if (fragmentsExist) {
1008  while (partIt != queryInfo.fragments.end()) {
1009  if (partIt->getPhysicalNumTuples() == 0) {
1010  // this means that a concurrent insert query inserted tuples into a new fragment
1011  // but when the query came in we didn't have this fragment. To make sure we
1012  // don't mess up the executor we delete this fragment from the metadatamap
1013  // (fixes earlier bug found 2015-05-08)
1014  partIt = queryInfo.fragments.erase(partIt);
1015  } else {
1016  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
1017  partIt->getPhysicalNumTuples());
1018  ++partIt;
1019  }
1020  }
1021  } else {
1022  // We added a dummy fragment and know the table is empty
1023  queryInfo.setPhysicalNumTuples(0);
1024  }
1025  return queryInfo;
1026 }
std::vector< int > levelSizes_
Definition: DataMgr.h:229
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumFragments ( )
overridevirtual

returns the number of fragments in a table

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 972 of file InsertOrderFragmenter.cpp.

972  {
974  return fragmentInfoVec_.size();
975 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1014 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1015  {
1016  const auto data_buffer = chunk->getBuffer();
1017  const auto data_addr = data_buffer->getMemoryPtr();
1018  const size_t nrows_in_chunk = data_buffer->size();
1019  const size_t ncore = cpu_threads();
1020  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1021  std::vector<std::vector<uint64_t>> deleted_offsets;
1022  deleted_offsets.resize(ncore);
1023  std::vector<std::future<void>> threads;
1024  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1025  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1026  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1027  const auto ithread = rbegin / segsz;
1028  CHECK(ithread < deleted_offsets.size());
1029  deleted_offsets[ithread].reserve(segsz);
1030  for (size_t r = rbegin; r < rend; ++r) {
1031  if (data_addr[r]) {
1032  deleted_offsets[ithread].push_back(r);
1033  }
1034  }
1035  }));
1036  }
1037  wait_cleanup_threads(threads);
1038  std::vector<uint64_t> all_deleted_offsets;
1039  for (size_t i = 0; i < ncore; ++i) {
1040  all_deleted_offsets.insert(
1041  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1042  }
1043  return all_deleted_offsets;
1044 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
#define CHECK(condition)
Definition: Logger.h:222
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 619 of file InsertOrderFragmenter.cpp.

References CHECK.

619  {
621 
622  for (auto const& fragment : fragmentInfoVec_) {
623  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
624  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
625  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
626  if (chunk_stats.max.tinyintval == 1) {
627  return true;
628  }
629  }
630  return false;
631 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
#define CHECK(condition)
Definition: Logger.h:222
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunks ( const InsertChunks insert_chunk)
overridevirtual

Insert chunks into minimal number of fragments.

Parameters
insert_chunk- the chunks to insert

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 431 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertChunks::db_id, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertChunks::table_id.

431  {
432  try {
433  // prevent two threads from trying to insert into the same table simultaneously
435  insertChunksImpl(insert_chunk);
436  if (defaultInsertLevel_ ==
437  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
439  chunkKeyPrefix_[0],
440  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
441  }
442  } catch (...) {
443  auto db_id = insert_chunk.db_id;
444  auto table_epochs = catalog_->getTableEpochs(db_id, insert_chunk.table_id);
445  // the statement below deletes *this* object!
446  // relying on exception propagation at this stage
447  // until we can sort this out in a cleaner fashion
448  catalog_->setTableEpochs(db_id, table_epochs);
449  throw;
450  }
451 }
void insertChunksImpl(const InsertChunks &insert_chunk)
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:548
std::unique_lock< T > unique_lock
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3584
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3556

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksImpl ( const InsertChunks insert_chunk)
protected

Definition at line 703 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_EQ, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertChunks::chunks, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::get_num_rows_to_insert(), and Fragmenter_Namespace::InsertChunks::valid_row_indices.

703  {
704  std::optional<int> delete_column_id{std::nullopt};
705  for (const auto& cit : columnMap_) {
706  if (cit.second.getColumnDesc()->isDeletedCol) {
707  delete_column_id = cit.second.getColumnDesc()->columnId;
708  }
709  }
710 
711  // verify that all chunks to be inserted have same number of rows, otherwise the input
712  // data is malformed
713  std::optional<size_t> num_rows{std::nullopt};
714  for (const auto& [column_id, chunk] : insert_chunks.chunks) {
715  auto buffer = chunk->getBuffer();
716  CHECK(buffer);
717  CHECK(buffer->hasEncoder());
718  if (!num_rows.has_value()) {
719  num_rows = buffer->getEncoder()->getNumElems();
720  } else {
721  CHECK_EQ(num_rows.value(), buffer->getEncoder()->getNumElems());
722  }
723  }
724 
725  auto valid_row_indices = insert_chunks.valid_row_indices;
726  size_t num_rows_left = valid_row_indices.size();
727  size_t num_rows_inserted = 0;
728 
729  if (num_rows_left == 0) {
730  return;
731  }
732 
733  FragmentInfo* current_fragment{nullptr};
734 
735  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
736  // feels fragile
737  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
738  current_fragment = createNewFragment(defaultInsertLevel_);
739  } else {
740  current_fragment = fragmentInfoVec_.back().get();
741  }
742  CHECK(current_fragment);
743 
744  size_t start_fragment = fragmentInfoVec_.size() - 1;
745 
746  while (num_rows_left > 0) { // may have to create multiple fragments for bulk insert
747  // loop until done inserting all rows
748  CHECK_LE(current_fragment->shadowNumTuples, maxFragmentRows_);
749  size_t rows_left_in_current_fragment =
750  maxFragmentRows_ - current_fragment->shadowNumTuples;
751  size_t num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
752  num_rows_left,
753  num_rows_inserted,
756  insert_chunks,
757  columnMap_,
758  valid_row_indices);
759 
760  if (rows_left_in_current_fragment == 0 || num_rows_to_insert == 0) {
761  current_fragment = createNewFragment(defaultInsertLevel_);
762  if (num_rows_inserted == 0) {
763  start_fragment++;
764  }
765  rows_left_in_current_fragment = maxFragmentRows_;
766  for (auto& varLenColInfoIt : varLenColInfo_) {
767  varLenColInfoIt.second = 0; // reset byte counter
768  }
769  num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
770  num_rows_left,
771  num_rows_inserted,
772  varLenColInfo_,
774  insert_chunks,
775  columnMap_,
776  valid_row_indices);
777  }
778 
779  CHECK_GT(num_rows_to_insert, size_t(0)); // would put us into an endless loop as we'd
780  // never be able to insert anything
781 
782  insertChunksIntoFragment(insert_chunks,
783  delete_column_id,
784  current_fragment,
785  num_rows_to_insert,
786  num_rows_inserted,
787  num_rows_left,
788  valid_row_indices,
789  start_fragment);
790  }
791  numTuples_ += *num_rows;
793 }
void insertChunksIntoFragment(const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, std::vector< size_t > &valid_row_indices, const size_t start_fragment)
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:234
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:233
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment, const size_t num_rows_left, const size_t num_rows_inserted, const std::unordered_map< int, size_t > &var_len_col_info, const size_t max_chunk_size, const InsertChunks &insert_chunks, std::map< int, Chunk_NS::Chunk > &column_map, const std::vector< size_t > &valid_row_indices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksIntoFragment ( const InsertChunks insert_chunks,
const std::optional< int >  delete_column_id,
FragmentInfo current_fragment,
const size_t  num_rows_to_insert,
size_t &  num_rows_inserted,
size_t &  num_rows_left,
std::vector< size_t > &  valid_row_indices,
const size_t  start_fragment 
)
private

Definition at line 633 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_EQ, CHECK_GE, CHECK_LE, Fragmenter_Namespace::InsertChunks::chunks, Fragmenter_Namespace::FragmentInfo::fragmentId, DataBlockPtr::numbersPtr, Fragmenter_Namespace::FragmentInfo::shadowChunkMetadataMap, and Fragmenter_Namespace::FragmentInfo::shadowNumTuples.

641  {
643  // for each column, append the data in the appropriate insert buffer
644  auto insert_row_indices = valid_row_indices;
645  CHECK_GE(insert_row_indices.size(), num_rows_to_insert);
646  insert_row_indices.erase(insert_row_indices.begin() + num_rows_to_insert,
647  insert_row_indices.end());
648  CHECK_EQ(insert_row_indices.size(), num_rows_to_insert);
649  for (auto& [column_id, chunk] : insert_chunks.chunks) {
650  auto col_map_it = columnMap_.find(column_id);
651  CHECK(col_map_it != columnMap_.end());
652  current_fragment->shadowChunkMetadataMap[column_id] =
653  col_map_it->second.appendEncodedDataAtIndices(*chunk, insert_row_indices);
654  auto var_len_col_info_it = varLenColInfo_.find(column_id);
655  if (var_len_col_info_it != varLenColInfo_.end()) {
656  var_len_col_info_it->second = col_map_it->second.getBuffer()->size();
657  CHECK_LE(var_len_col_info_it->second, maxChunkSize_);
658  }
659  }
660  if (hasMaterializedRowId_) {
661  size_t start_id = maxFragmentRows_ * current_fragment->fragmentId +
662  current_fragment->shadowNumTuples;
663  std::vector<int64_t> row_id_data(num_rows_to_insert);
664  for (size_t i = 0; i < num_rows_to_insert; ++i) {
665  row_id_data[i] = i + start_id;
666  }
667  DataBlockPtr row_id_block;
668  row_id_block.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.data());
669  auto col_map_it = columnMap_.find(rowIdColId_);
670  CHECK(col_map_it != columnMap_.end());
671  current_fragment->shadowChunkMetadataMap[rowIdColId_] = col_map_it->second.appendData(
672  row_id_block, num_rows_to_insert, num_rows_inserted);
673  }
674 
675  if (delete_column_id) { // has delete column
676  std::vector<int8_t> delete_data(num_rows_to_insert, false);
677  DataBlockPtr delete_block;
678  delete_block.numbersPtr = reinterpret_cast<int8_t*>(delete_data.data());
679  auto col_map_it = columnMap_.find(*delete_column_id);
680  CHECK(col_map_it != columnMap_.end());
681  current_fragment->shadowChunkMetadataMap[*delete_column_id] =
682  col_map_it->second.appendData(
683  delete_block, num_rows_to_insert, num_rows_inserted);
684  }
685 
686  current_fragment->shadowNumTuples =
687  fragmentInfoVec_.back()->getPhysicalNumTuples() + num_rows_to_insert;
688  num_rows_left -= num_rows_to_insert;
689  num_rows_inserted += num_rows_to_insert;
690  for (auto part_it = fragmentInfoVec_.begin() + start_fragment;
691  part_it != fragmentInfoVec_.end();
692  ++part_it) {
693  auto fragment_ptr = part_it->get();
694  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
695  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
696  }
697 
698  // truncate the first `num_rows_to_insert` rows in `valid_row_indices`
699  valid_row_indices.erase(valid_row_indices.begin(),
700  valid_row_indices.begin() + num_rows_to_insert);
701 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GE(x, y)
Definition: Logger.h:235
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::unique_lock< T > unique_lock
#define CHECK_LE(x, y)
Definition: Logger.h:233
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:226
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksNoCheckpoint ( const InsertChunks insert_chunk)
overridevirtual

Insert chunks into minimal number of fragments; no locks or checkpoints taken.

Parameters
chunk- the chunks to insert

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 479 of file InsertOrderFragmenter.cpp.

479  {
480  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
482  insertMutex_); // prevent two threads from trying to insert into the same table
483  // simultaneously
484  insertChunksImpl(insert_chunk);
485 }
void insertChunksImpl(const InsertChunks &insert_chunk)
std::unique_lock< T > unique_lock
void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insert_data_struct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 453 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

453  {
454  try {
455  // prevent two threads from trying to insert into the same table simultaneously
457  if (!isAddingNewColumns(insert_data_struct)) {
458  insertDataImpl(insert_data_struct);
459  } else {
460  addColumns(insert_data_struct);
461  }
462  if (defaultInsertLevel_ ==
463  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
465  chunkKeyPrefix_[0],
466  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
467  }
468  } catch (...) {
469  auto table_epochs = catalog_->getTableEpochs(insert_data_struct.databaseId,
470  insert_data_struct.tableId);
471  // the statement below deletes *this* object!
472  // relying on exception propagation at this stage
473  // until we can sort this out in a cleaner fashion
474  catalog_->setTableEpochs(insert_data_struct.databaseId, table_epochs);
475  throw;
476  }
477 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:548
void addColumns(const InsertData &insertDataStruct)
std::unique_lock< T > unique_lock
bool isAddingNewColumns(const InsertData &insert_data) const
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3584
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3556

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insert_data)
protected

Definition at line 795 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::is_default, DataBlockPtr::numbersPtr, and Fragmenter_Namespace::InsertData::numRows.

795  {
796  // populate deleted system column if it should exist, as it will not come from client
797  std::unique_ptr<int8_t[]> data_for_deleted_column;
798  for (const auto& cit : columnMap_) {
799  if (cit.second.getColumnDesc()->isDeletedCol) {
800  data_for_deleted_column.reset(new int8_t[insert_data.numRows]);
801  memset(data_for_deleted_column.get(), 0, insert_data.numRows);
802  insert_data.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
803  insert_data.columnIds.push_back(cit.second.getColumnDesc()->columnId);
804  insert_data.is_default.push_back(false);
805  break;
806  }
807  }
808  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
809  std::unordered_map<int, int> inverseInsertDataColIdMap;
810  for (size_t insertId = 0; insertId < insert_data.columnIds.size(); ++insertId) {
811  inverseInsertDataColIdMap.insert(
812  std::make_pair(insert_data.columnIds[insertId], insertId));
813  }
814 
815  size_t numRowsLeft = insert_data.numRows;
816  size_t numRowsInserted = 0;
817  vector<DataBlockPtr> dataCopy =
818  insert_data.data; // bc append data will move ptr forward and this violates
819  // constness of InsertData
820  if (numRowsLeft <= 0) {
821  return;
822  }
823 
824  FragmentInfo* currentFragment{nullptr};
825 
826  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
827  // feels fragile
828  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
829  currentFragment = createNewFragment(defaultInsertLevel_);
830  } else {
831  currentFragment = fragmentInfoVec_.back().get();
832  }
833  CHECK(currentFragment);
834 
835  size_t startFragment = fragmentInfoVec_.size() - 1;
836 
837  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
838  // loop until done inserting all rows
839  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
840  size_t rowsLeftInCurrentFragment =
841  maxFragmentRows_ - currentFragment->shadowNumTuples;
842  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
843  if (rowsLeftInCurrentFragment != 0) {
844  for (auto& varLenColInfoIt : varLenColInfo_) {
845  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
846  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
847  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
848  if (insertIdIt != inverseInsertDataColIdMap.end()) {
849  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
850  numRowsToInsert = std::min(numRowsToInsert,
851  colMapIt->second.getNumElemsForBytesInsertData(
852  dataCopy[insertIdIt->second],
853  numRowsToInsert,
854  numRowsInserted,
855  bytesLeft,
856  insert_data.is_default[insertIdIt->second]));
857  }
858  }
859  }
860 
861  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
862  currentFragment = createNewFragment(defaultInsertLevel_);
863  if (numRowsInserted == 0) {
864  startFragment++;
865  }
866  rowsLeftInCurrentFragment = maxFragmentRows_;
867  for (auto& varLenColInfoIt : varLenColInfo_) {
868  varLenColInfoIt.second = 0; // reset byte counter
869  }
870  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
871  for (auto& varLenColInfoIt : varLenColInfo_) {
872  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
873  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
874  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
875  if (insertIdIt != inverseInsertDataColIdMap.end()) {
876  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
877  numRowsToInsert = std::min(numRowsToInsert,
878  colMapIt->second.getNumElemsForBytesInsertData(
879  dataCopy[insertIdIt->second],
880  numRowsToInsert,
881  numRowsInserted,
882  bytesLeft,
883  insert_data.is_default[insertIdIt->second]));
884  }
885  }
886  }
887 
888  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
889  // never be able to insert anything
890 
891  {
893  // for each column, append the data in the appropriate insert buffer
894  for (size_t i = 0; i < insert_data.columnIds.size(); ++i) {
895  int columnId = insert_data.columnIds[i];
896  auto colMapIt = columnMap_.find(columnId);
897  CHECK(colMapIt != columnMap_.end());
898  currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
899  dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.is_default[i]);
900  auto varLenColInfoIt = varLenColInfo_.find(columnId);
901  if (varLenColInfoIt != varLenColInfo_.end()) {
902  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
903  }
904  }
905  if (hasMaterializedRowId_) {
906  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
907  currentFragment->shadowNumTuples;
908  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
909  for (size_t i = 0; i < numRowsToInsert; ++i) {
910  row_id_data[i] = i + startId;
911  }
912  DataBlockPtr rowIdBlock;
913  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
914  auto colMapIt = columnMap_.find(rowIdColId_);
915  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
916  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
917  }
918 
919  currentFragment->shadowNumTuples =
920  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
921  numRowsLeft -= numRowsToInsert;
922  numRowsInserted += numRowsToInsert;
923  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
924  partIt != fragmentInfoVec_.end();
925  ++partIt) {
926  auto fragment_ptr = partIt->get();
927  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
928  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
929  }
930  }
931  }
932  numTuples_ += insert_data.numRows;
934 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::unique_lock< T > unique_lock
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:233
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:226
void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insert_data_struct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 487 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

487  {
488  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
490  insertMutex_); // prevent two threads from trying to insert into the same table
491  // simultaneously
492  if (!isAddingNewColumns(insert_data_struct)) {
493  insertDataImpl(insert_data_struct);
494  } else {
495  addColumns(insert_data_struct);
496  }
497 }
void addColumns(const InsertData &insertDataStruct)
std::unique_lock< T > unique_lock
bool isAddingNewColumns(const InsertData &insert_data) const

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::isAddingNewColumns ( const InsertData insert_data) const
private

Definition at line 416 of file InsertOrderFragmenter.cpp.

References CHECK, and Fragmenter_Namespace::InsertData::columnIds.

416  {
417  bool all_columns_already_exist = true, all_columns_are_new = true;
418  for (const auto column_id : insert_data.columnIds) {
419  if (columnMap_.find(column_id) == columnMap_.end()) {
420  all_columns_already_exist = false;
421  } else {
422  all_columns_are_new = false;
423  }
424  }
425  // only one should be TRUE
426  bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
427  CHECK(either_all_exist_or_all_new);
428  return all_columns_are_new;
429 }
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::resetSizesFromFragments ( )
overridevirtual

Resets the fragmenter's size related metadata using the internal fragment info vector. This is typically done after operations, such as vacuuming, which can change fragment sizes.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1028 of file InsertOrderFragmenter.cpp.

1028  {
1030  numTuples_ = 0;
1031  for (const auto& fragment_info : fragmentInfoVec_) {
1032  numTuples_ += fragment_info->getPhysicalNumTuples();
1033  }
1035 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
void Fragmenter_Namespace::InsertOrderFragmenter::setLastFragmentVarLenColumnSizes ( )
private

Definition at line 1037 of file InsertOrderFragmenter.cpp.

1037  {
1038  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
1039  // Now need to get the insert buffers for each column - should be last
1040  // fragment
1041  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
1042  // TODO: add accessor here for safe indexing
1043  int deviceId =
1044  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
1045  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
1046  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
1047  insertKey.push_back(colIt->first); // column id
1048  insertKey.push_back(lastFragmentId); // fragment id
1049  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
1050  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
1051  if (varLenColInfoIt != varLenColInfo_.end()) {
1052  varLenColInfoIt->second = colIt->second.getBuffer()->size();
1053  }
1054  }
1055  }
1056 }
std::vector< int > ChunkKey
Definition: types.h:36
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map,
std::optional< Data_Namespace::MemoryLevel memory_level 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 315 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, show_chunk(), VLOG, and logger::WARNING.

318  {
319  // synchronize concurrent accesses to fragmentInfoVec_
326  if (shard_ >= 0) {
327  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
328  }
329 
330  CHECK(cd);
331  const auto column_id = cd->columnId;
332  const auto col_itr = columnMap_.find(column_id);
333  CHECK(col_itr != columnMap_.end());
334 
335  for (auto const& fragment : fragmentInfoVec_) {
336  auto stats_itr = stats_map.find(fragment->fragmentId);
337  if (stats_itr != stats_map.end()) {
338  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
339  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
340  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
342  column_id,
343  fragment->fragmentId};
344  auto chunk = Chunk_NS::Chunk::getChunk(cd,
345  &catalog_->getDataMgr(),
346  chunk_key,
347  memory_level.value_or(defaultInsertLevel_),
348  0,
349  chunk_meta_it->second->numBytes,
350  chunk_meta_it->second->numElements);
351  auto buf = chunk->getBuffer();
352  CHECK(buf);
353  if (!buf->hasEncoder()) {
354  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
355  }
356  auto encoder = buf->getEncoder();
357 
358  auto chunk_stats = stats_itr->second;
359 
360  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
361  encoder->getMetadata(old_chunk_metadata);
362  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
363 
364  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
365  // Use the logical type to display data, since the encoding should be ignored
366  const auto logical_ti = cd->columnType.is_dict_encoded_string()
368  : get_logical_type_info(cd->columnType);
369  if (!didResetStats) {
370  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
371  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
372  << DatumToString(chunk_stats.max, logical_ti);
373  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
374  << DatumToString(chunk_stats.min, logical_ti);
375  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
376  continue; // move to next fragment
377  }
378 
379  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
380  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
381  << DatumToString(chunk_stats.max, logical_ti);
382  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
383  << DatumToString(chunk_stats.min, logical_ti);
384  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
385 
386  // Reset fragment metadata map and set buffer to dirty
387  auto new_metadata = std::make_shared<ChunkMetadata>();
388  // Run through fillChunkStats to ensure any transformations to the raw metadata
389  // values get applied (e.g. for date in days)
390  encoder->getMetadata(new_metadata);
391 
392  fragment->setChunkMetadata(column_id, new_metadata);
393  fragment->shadowChunkMetadataMap =
394  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
396  buf->setDirty();
397  }
398  } else {
399  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
400  << ", table " << physicalTableId_ << ", "
401  << ", column " << column_id;
402  }
403  }
404 }
std::vector< int > ChunkKey
Definition: types.h:36
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:392
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
#define LOG(tag)
Definition: Logger.h:216
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1087
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
std::unique_lock< T > unique_lock
#define CHECK(condition)
Definition: Logger.h:222
bool is_dict_encoded_string() const
Definition: sqltypes.h:548
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:316

+ Here is the call graph for this function:

std::optional< ChunkUpdateStats > Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 636 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, CHECK_GT, Fragmenter_Namespace::ChunkUpdateStats::chunk, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), Fragmenter_Namespace::ChunkUpdateStats::fragment_rows_count, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::ChunkUpdateStats::new_values_stats, Fragmenter_Namespace::ChunkUpdateStats::old_values_stats, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, TableDescriptor::tableId, temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, Fragmenter_Namespace::anonymous_namespace{UpdelStorage.cpp}::update_metadata(), foreign_storage::update_stats(), updateColumnMetadata(), Fragmenter_Namespace::ChunkUpdateStats::updated_rows_count, NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

645  {
646  updel_roll.catalog = catalog;
647  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
648  updel_roll.memoryLevel = memory_level;
649 
650  const size_t ncore = cpu_threads();
651  const auto nrow = frag_offsets.size();
652  const auto n_rhs_values = rhs_values.size();
653  if (0 == nrow) {
654  return {};
655  }
656  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
657 
658  auto fragment_ptr = getFragmentInfo(fragment_id);
659  auto& fragment = *fragment_ptr;
660  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
661  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
662  ChunkKey chunk_key{
663  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
664  auto chunk = Chunk_NS::Chunk::getChunk(cd,
665  &catalog->getDataMgr(),
666  chunk_key,
668  0,
669  chunk_meta_it->second->numBytes,
670  chunk_meta_it->second->numElements);
671 
672  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
673 
674  // parallel update elements
675  std::vector<std::future<void>> threads;
676 
677  const auto segsz = (nrow + ncore - 1) / ncore;
678  auto dbuf = chunk->getBuffer();
679  auto dbuf_addr = dbuf->getMemoryPtr();
680  dbuf->setUpdated();
681  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
682  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
683  threads.emplace_back(std::async(
684  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
685  SQLTypeInfo lhs_type = cd->columnType;
686 
687  // !! not sure if this is a undocumented convention or a bug, but for a sharded
688  // table the dictionary id of a encoded string column is not specified by
689  // comp_param in physical table but somehow in logical table :) comp_param in
690  // physical table is always 0, so need to adapt accordingly...
691  auto cdl = (shard_ < 0)
692  ? cd
693  : catalog->getMetadataForColumn(
694  catalog->getLogicalTableId(td->tableId), cd->columnId);
695  CHECK(cdl);
696  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
697  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
698  lhs_type, &decimalOverflowValidator);
699  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
700  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
701  lhs_type, &dateDaysOverflowValidator);
702 
703  StringDictionary* stringDict{nullptr};
704  if (lhs_type.is_string()) {
705  CHECK(kENCODING_DICT == lhs_type.get_compression());
706  auto dictDesc = const_cast<DictDescriptor*>(
707  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
708  CHECK(dictDesc);
709  stringDict = dictDesc->stringDict.get();
710  CHECK(stringDict);
711  }
712 
713  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
714  const auto roffs = frag_offsets[r];
715  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
716  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
717  ScalarTargetValue sv2;
718 
719  // Subtle here is on the two cases of string-to-string assignments, when
720  // upstream passes RHS string as a string index instead of a preferred "real
721  // string".
722  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
723  // index
724  // in this layer, so if upstream passes a str idx here, an
725  // exception is thrown.
726  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
727  // str idx.
728  if (rhs_type.is_string()) {
729  if (const auto vp = boost::get<int64_t>(sv)) {
730  auto dictDesc = const_cast<DictDescriptor*>(
731  catalog->getMetadataForDict(rhs_type.get_comp_param()));
732  if (nullptr == dictDesc) {
733  throw std::runtime_error(
734  "UPDATE does not support cast from string literal to string "
735  "column.");
736  }
737  auto stringDict = dictDesc->stringDict.get();
738  CHECK(stringDict);
739  sv2 = NullableString(stringDict->getString(*vp));
740  sv = &sv2;
741  }
742  }
743 
744  if (const auto vp = boost::get<int64_t>(sv)) {
745  auto v = *vp;
746  if (lhs_type.is_string()) {
747  throw std::runtime_error("UPDATE does not support cast to string.");
748  }
749  int64_t old_val;
750  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
751  // Handle special case where date column with date in days encoding stores
752  // metadata in epoch seconds.
753  if (lhs_type.is_date_in_days()) {
755  }
756  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
757  if (lhs_type.is_decimal()) {
758  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
759  int64_t decimal_val;
760  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
761  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
762  lhs_type.get_notnull() == false)
763  ? v
764  : decimal_val;
766  lhs_type, update_stats_per_thread[c], target_value, old_val);
767  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
768  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
769  if (positive_v_and_negative_d || negative_v_and_positive_d) {
770  throw std::runtime_error(
771  "Data conversion overflow on " + std::to_string(v) +
772  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
773  std::to_string(rhs_type.get_scale()) + ") to (" +
774  std::to_string(lhs_type.get_dimension()) + ", " +
775  std::to_string(lhs_type.get_scale()) + ")");
776  }
777  } else if (is_integral(lhs_type)) {
778  if (lhs_type.is_date_in_days()) {
779  // Store meta values in seconds
780  if (lhs_type.get_size() == 2) {
781  nullAwareDateOverflowValidator.validate<int16_t>(v);
782  } else {
783  nullAwareDateOverflowValidator.validate<int32_t>(v);
784  }
785  int64_t days;
786  get_scalar<int64_t>(data_ptr, lhs_type, days);
787  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
788  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
789  lhs_type.get_notnull() == false)
790  ? NullSentinelSupplier()(lhs_type, v)
791  : seconds;
793  lhs_type, update_stats_per_thread[c], target_value, old_val);
794  } else {
795  int64_t target_value;
796  if (rhs_type.is_decimal()) {
797  target_value = round(decimal_to_double(rhs_type, v));
798  } else {
799  target_value = v;
800  }
802  lhs_type, update_stats_per_thread[c], target_value, old_val);
803  }
804  } else {
805  if (rhs_type.is_decimal()) {
806  update_metadata(lhs_type,
807  update_stats_per_thread[c],
808  decimal_to_double(rhs_type, v),
809  double(old_val));
810  } else {
811  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
812  }
813  }
814  } else if (const auto vp = boost::get<double>(sv)) {
815  auto v = *vp;
816  if (lhs_type.is_string()) {
817  throw std::runtime_error("UPDATE does not support cast to string.");
818  }
819  double old_val;
820  get_scalar<double>(data_ptr, lhs_type, old_val);
821  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
822  if (lhs_type.is_integer()) {
824  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
825  } else if (lhs_type.is_fp()) {
827  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
828  } else {
829  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
830  "LHS with a floating RHS.";
831  }
832  } else if (const auto vp = boost::get<float>(sv)) {
833  auto v = *vp;
834  if (lhs_type.is_string()) {
835  throw std::runtime_error("UPDATE does not support cast to string.");
836  }
837  float old_val;
838  get_scalar<float>(data_ptr, lhs_type, old_val);
839  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
840  if (lhs_type.is_integer()) {
842  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
843  } else {
844  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
845  }
846  } else if (const auto vp = boost::get<NullableString>(sv)) {
847  const auto s = boost::get<std::string>(vp);
848  const auto sval = s ? *s : std::string("");
849  if (lhs_type.is_string()) {
850  decltype(stringDict->getOrAdd(sval)) sidx;
851  {
852  std::unique_lock<std::mutex> lock(temp_mutex_);
853  sidx = stringDict->getOrAdd(sval);
854  }
855  int64_t old_val;
856  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
857  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
859  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
860  } else if (sval.size() > 0) {
861  auto dval = std::atof(sval.data());
862  if (lhs_type.is_boolean()) {
863  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
864  } else if (lhs_type.is_time()) {
865  throw std::runtime_error(
866  "Date/Time/Timestamp update not supported through translated "
867  "string path.");
868  }
869  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
870  double old_val;
871  get_scalar<double>(data_ptr, lhs_type, old_val);
872  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
874  lhs_type, update_stats_per_thread[c], double(dval), old_val);
875  } else {
876  int64_t old_val;
877  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
878  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
880  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
881  }
882  } else {
883  put_null(data_ptr, lhs_type, cd->columnName);
884  update_stats_per_thread[c].new_values_stats.has_null = true;
885  }
886  } else {
887  CHECK(false);
888  }
889  }
890  }));
891  if (threads.size() >= (size_t)cpu_threads()) {
892  wait_cleanup_threads(threads);
893  }
894  }
895  wait_cleanup_threads(threads);
896 
897  // for unit test
899  if (cd->isDeletedCol) {
900  const auto deleted_offsets = getVacuumOffsets(chunk);
901  if (deleted_offsets.size() > 0) {
902  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
903  return {};
904  }
905  }
906  }
907  ChunkUpdateStats update_stats;
908  for (size_t c = 0; c < ncore; ++c) {
909  update_metadata(update_stats.new_values_stats,
910  update_stats_per_thread[c].new_values_stats);
911  update_metadata(update_stats.old_values_stats,
912  update_stats_per_thread[c].old_values_stats);
913  }
914 
915  CHECK_GT(fragment.shadowNumTuples, size_t(0));
917  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
918  update_stats.updated_rows_count = nrow;
919  update_stats.fragment_rows_count = fragment.shadowNumTuples;
920  update_stats.chunk = chunk;
921  return update_stats;
922 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
std::vector< int > ChunkKey
Definition: types.h:36
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
#define UNREACHABLE()
Definition: Logger.h:266
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4740
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1960
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:153
int logicalTableId
Definition: UpdelRoll.h:54
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:222
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:510
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:513
std::string columnName
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:154

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 55 of file UpdelStorage.cpp.

References updateColumn().

63  {
64  updateColumn(catalog,
65  td,
66  cd,
67  fragment_id,
68  frag_offsets,
69  std::vector<ScalarTargetValue>(1, rhs_value),
70  rhs_type,
71  memory_level,
72  updel_roll);
73 }
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnChunkMetadata ( const ColumnDescriptor cd,
const int  fragment_id,
const std::shared_ptr< ChunkMetadata metadata 
)
overridevirtual

Updates the metadata for a column chunk.

Parameters
cd- ColumnDescriptor for the column
fragment_id- Fragment id of the chunk within the column
metadata- shared_ptr of the metadata to update column chunk with

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 302 of file InsertOrderFragmenter.cpp.

References CHECK, and ColumnDescriptor::columnId.

305  {
306  // synchronize concurrent accesses to fragmentInfoVec_
308 
309  CHECK(metadata.get());
310  auto fragment_info = getFragmentInfo(fragment_id);
311  CHECK(fragment_info);
312  fragment_info->setChunkMetadata(cd->columnId, metadata);
313 }
std::unique_lock< T > unique_lock
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
#define CHECK(condition)
Definition: Logger.h:222
void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const UpdateValuesStats update_values_stats,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 924 of file UpdelStorage.cpp.

References UpdelRoll::catalog, ColumnDescriptor::columnId, ColumnDescriptor::columnType, fragmentInfoMutex_, UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getMetadataForTable(), Fragmenter_Namespace::UpdateValuesStats::has_null, SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, Fragmenter_Namespace::UpdateValuesStats::max_double, Fragmenter_Namespace::UpdateValuesStats::max_int64t, Fragmenter_Namespace::UpdateValuesStats::min_double, Fragmenter_Namespace::UpdateValuesStats::min_int64t, ColumnDescriptor::tableId, and foreign_storage::update_stats().

Referenced by compactRows(), and updateColumn().

930  {
932  auto buffer = chunk->getBuffer();
933  const auto& lhs_type = cd->columnType;
934 
935  auto encoder = buffer->getEncoder();
936  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
937  static_assert(std::is_same<decltype(min), decltype(max)>::value,
938  "Type mismatch on min/max");
939  if (has_null) {
940  encoder->updateStats(decltype(min)(), true);
941  }
942  if (max < min) {
943  return;
944  }
945  encoder->updateStats(min, false);
946  encoder->updateStats(max, false);
947  };
948 
949  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
950  update_stats(new_values_stats.min_int64t,
951  new_values_stats.max_int64t,
952  new_values_stats.has_null);
953  } else if (lhs_type.is_fp()) {
954  update_stats(new_values_stats.min_double,
955  new_values_stats.max_double,
956  new_values_stats.has_null);
957  } else if (lhs_type.is_decimal()) {
958  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
959  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
960  new_values_stats.has_null);
961  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
962  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
963  update_stats(new_values_stats.min_int64t,
964  new_values_stats.max_int64t,
965  new_values_stats.has_null);
966  }
967  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
968  auto chunk_metadata =
969  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
970  buffer->getEncoder()->getMetadata(chunk_metadata);
971 }
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
heavyai::unique_lock< heavyai::shared_mutex > write_lock
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
std::unique_lock< T > unique_lock
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:513
bool is_integral(const SQLTypeInfo &t)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll,
Executor executor 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 268 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, checked_get(), Fragmenter_Namespace::InsertData::columnIds, cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, g_enable_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), Fragmenter_Namespace::InsertData::is_default, SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

278  {
279  updelRoll.is_varlen_update = true;
280  updelRoll.catalog = catalog;
281  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
282  updelRoll.memoryLevel = memoryLevel;
283 
284  size_t num_entries = sourceDataProvider.getEntryCount();
285  size_t num_rows = sourceDataProvider.getRowCount();
286 
287  if (0 == num_rows) {
288  // bail out early
289  return;
290  }
291 
293 
294  auto fragment_ptr = getFragmentInfo(fragmentId);
295  auto& fragment = *fragment_ptr;
296  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
297  get_chunks(catalog, td, fragment, memoryLevel, chunks);
298  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
299  columnDescriptors.size());
300  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
301  size_t indexOfDeletedColumn{0};
302  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
303  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
304  auto chunk = chunks[indexOfChunk];
305  const auto chunk_cd = chunk->getColumnDesc();
306 
307  if (chunk_cd->isDeletedCol) {
308  indexOfDeletedColumn = chunk_cd->columnId;
309  deletedChunk = chunk;
310  continue;
311  }
312 
313  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
314  columnDescriptors.end(),
315  [=](const ColumnDescriptor* cd) -> bool {
316  return cd->columnId == chunk_cd->columnId;
317  });
318 
319  if (targetColumnIt != columnDescriptors.end()) {
320  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
321 
322  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
323  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
324 
326  num_rows,
327  *catalog,
328  sourceDataMetaInfo,
329  targetDescriptor,
330  targetDescriptor->columnType,
331  !targetDescriptor->columnType.get_notnull(),
332  sourceDataProvider.getLiteralDictionary(),
334  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
335  ? executor->getStringDictionaryProxy(
336  sourceDataMetaInfo.get_type_info().get_comp_param(),
337  executor->getRowSetMemoryOwner(),
338  true)
339  : nullptr,
340  nullptr};
341  auto converter = factory.create(param);
342  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
343 
344  if (targetDescriptor->columnType.is_geometry()) {
345  // geometry columns are composites
346  // need to skip chunks, depending on geo type
347  switch (targetDescriptor->columnType.get_type()) {
348  case kMULTIPOLYGON:
349  indexOfChunk += 5;
350  break;
351  case kPOLYGON:
352  indexOfChunk += 4;
353  break;
354  case kLINESTRING:
355  indexOfChunk += 2;
356  break;
357  case kPOINT:
358  indexOfChunk += 1;
359  break;
360  default:
361  CHECK(false); // not supported
362  }
363  }
364  } else {
365  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
366  std::unique_ptr<ChunkToInsertDataConverter> converter;
367 
368  if (chunk_cd->columnType.is_fixlen_array()) {
369  converter =
370  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
371  } else if (chunk_cd->columnType.is_string()) {
372  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
373  } else if (chunk_cd->columnType.is_geometry()) {
374  // the logical geo column is a string column
375  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
376  } else {
377  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
378  }
379 
380  chunkConverters.push_back(std::move(converter));
381 
382  } else if (chunk_cd->columnType.is_date_in_days()) {
383  /* Q: Why do we need this?
384  A: In variable length updates path we move the chunk content of column
385  without decoding. Since it again passes through DateDaysEncoder
386  the expected value should be in seconds, but here it will be in days.
387  Therefore, using DateChunkConverter chunk values are being scaled to
388  seconds which then ultimately encoded in days in DateDaysEncoder.
389  */
390  std::unique_ptr<ChunkToInsertDataConverter> converter;
391  const size_t physical_size = chunk_cd->columnType.get_size();
392  if (physical_size == 2) {
393  converter =
394  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
395  } else if (physical_size == 4) {
396  converter =
397  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
398  } else {
399  CHECK(false);
400  }
401  chunkConverters.push_back(std::move(converter));
402  } else {
403  std::unique_ptr<ChunkToInsertDataConverter> converter;
404  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
405  int logical_size = logical_type.get_size();
406  int physical_size = chunk_cd->columnType.get_size();
407 
408  if (logical_type.is_string()) {
409  // for dicts -> logical = physical
410  logical_size = physical_size;
411  }
412 
413  if (8 == physical_size) {
414  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
415  num_rows, chunk.get());
416  } else if (4 == physical_size) {
417  if (8 == logical_size) {
418  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
419  num_rows, chunk.get());
420  } else {
421  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
422  num_rows, chunk.get());
423  }
424  } else if (2 == chunk_cd->columnType.get_size()) {
425  if (8 == logical_size) {
426  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
427  num_rows, chunk.get());
428  } else if (4 == logical_size) {
429  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
430  num_rows, chunk.get());
431  } else {
432  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
433  num_rows, chunk.get());
434  }
435  } else if (1 == chunk_cd->columnType.get_size()) {
436  if (8 == logical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
438  num_rows, chunk.get());
439  } else if (4 == logical_size) {
440  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
441  num_rows, chunk.get());
442  } else if (2 == logical_size) {
443  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
444  num_rows, chunk.get());
445  } else {
446  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
447  num_rows, chunk.get());
448  }
449  } else {
450  CHECK(false); // unknown
451  }
452 
453  chunkConverters.push_back(std::move(converter));
454  }
455  }
456  }
457 
458  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
459  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
460 
461  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
462  bool* deletedChunkBuffer =
463  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
464 
465  std::atomic<size_t> row_idx{0};
466 
467  auto row_converter = [&sourceDataProvider,
468  &sourceDataConverters,
469  &indexOffFragmentOffsetColumn,
470  &chunkConverters,
471  &deletedChunkBuffer,
472  &row_idx](size_t indexOfEntry) -> void {
473  // convert the source data
474  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
475  if (row.empty()) {
476  return;
477  }
478 
479  size_t indexOfRow = row_idx.fetch_add(1);
480 
481  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
482  if (sourceDataConverters[col]) {
483  const auto& mapd_variant = row[col];
484  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
485  }
486  }
487 
488  auto scalar = checked_get(
489  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
490  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
491 
492  // convert the remaining chunks
493  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
494  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
495  }
496 
497  // now mark the row as deleted
498  deletedChunkBuffer[indexInChunkBuffer] = true;
499  };
500 
501  bool can_go_parallel = num_rows > 20000;
502 
503  if (can_go_parallel) {
504  const size_t num_worker_threads = cpu_threads();
505  std::vector<std::future<void>> worker_threads;
506  for (size_t i = 0,
507  start_entry = 0,
508  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
509  i < num_worker_threads && start_entry < num_entries;
510  ++i, start_entry += stride) {
511  const auto end_entry = std::min(start_entry + stride, num_rows);
512  worker_threads.push_back(std::async(
514  [&row_converter](const size_t start, const size_t end) {
515  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
516  row_converter(indexOfRow);
517  }
518  },
519  start_entry,
520  end_entry));
521  }
522 
523  for (auto& child : worker_threads) {
524  child.wait();
525  }
526 
527  } else {
528  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
529  row_converter(entryIdx);
530  }
531  }
532 
534  insert_data.databaseId = catalog->getCurrentDB().dbId;
535  insert_data.tableId = td->tableId;
536 
537  for (size_t i = 0; i < chunkConverters.size(); i++) {
538  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
539  continue;
540  }
541 
542  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
543  if (sourceDataConverters[i]) {
544  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
545  }
546  continue;
547  }
548 
549  insert_data.numRows = num_rows;
550  insert_data.is_default.resize(insert_data.columnIds.size(), false);
551  insertDataNoCheckpoint(insert_data);
552 
553  // update metdata for deleted chunk as we are doing special handling
554  auto chunkMetadata =
555  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
556  chunkMetadata->chunkStats.max.boolval = 1;
557 
558  // Im not completely sure that we need to do this in fragmented and on the buffer
559  // but leaving this alone for now
560  if (!deletedChunk->getBuffer()->hasEncoder()) {
561  deletedChunk->initEncoder();
562  }
563  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
564 
565  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
566  // An append to the same fragment will increase shadowNumTuples.
567  // Update NumElems in this case. Otherwise, use existing NumElems.
568  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
569  }
570  deletedChunk->getBuffer()->setUpdated();
571 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
bool is_varlen_update
Definition: UpdelRoll.h:57
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1087
std::vector< bool > is_default
Definition: Fragmenter.h:75
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
future< Result > async(Fn &&fn, Args &&...args)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4740
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:54
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
#define CHECK(condition)
Definition: Logger.h:222
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
bool is_string() const
Definition: sqltypes.h:510
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:24
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 973 of file UpdelStorage.cpp.

References fragmentInfoMutex_, UpdelRoll::getChunkMetadataMap(), and UpdelRoll::getNumTuple().

975  {
977  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
978  auto& fragmentInfo = *key.second;
979  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
980  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
981  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
982  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
983 }
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
std::unique_lock< T > unique_lock
size_t getNumTuple(const MetaDataKey &key) const

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1077 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1080  {
1081  const auto cd = chunk->getColumnDesc();
1082  const auto& col_type = cd->columnType;
1083  auto data_buffer = chunk->getBuffer();
1084  auto data_addr = data_buffer->getMemoryPtr();
1085  auto element_size =
1086  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1087  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1088  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1089  size_t nbytes_fix_data_to_keep = 0;
1090  auto nrows_to_vacuum = frag_offsets.size();
1091  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1092  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1093  auto is_last_one = irow == nrows_to_vacuum;
1094  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1095  auto maddr_to_vacuum = data_addr;
1096  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1097  if (nrows_to_keep > 0) {
1098  auto nbytes_to_keep = nrows_to_keep * element_size;
1099  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1100  // move curr fixlen row block toward front
1101  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1102  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1103  nbytes_to_keep);
1104  }
1105  irow_of_blk_to_fill += nrows_to_keep;
1106  nbytes_fix_data_to_keep += nbytes_to_keep;
1107  }
1108  irow_of_blk_to_keep = irow_to_vacuum + 1;
1109  }
1110  return nbytes_fix_data_to_keep;
1111 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1191 of file UpdelStorage.cpp.

References CHECK, Fragmenter_Namespace::get_buffer_offset(), Fragmenter_Namespace::get_null_padding(), Fragmenter_Namespace::get_var_len_null_array_indexes(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1194  {
1195  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1196  auto data_buffer = chunk->getBuffer();
1197  CHECK(data_buffer);
1198  auto index_buffer = chunk->getIndexBuf();
1199  CHECK(index_buffer);
1200  auto data_addr = data_buffer->getMemoryPtr();
1201  auto indices_addr = index_buffer->getMemoryPtr();
1202  CHECK(indices_addr);
1203  auto index_array = (StringOffsetT*)indices_addr;
1204  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1205  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1206  size_t nbytes_fix_data_to_keep = 0;
1207  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1208  size_t null_padding =
1209  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1210  size_t nbytes_var_data_to_keep = null_padding;
1211  auto null_array_indexes = get_var_len_null_array_indexes(
1212  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1213  auto nrows_to_vacuum = frag_offsets.size();
1214  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1215  auto is_last_one = irow == nrows_to_vacuum;
1216  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1217  auto maddr_to_vacuum = data_addr;
1218  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1219  if (nrows_to_keep > 0) {
1220  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1221  auto deleted_row_start_offset =
1222  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1223  auto kept_row_start_offset =
1224  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1225  auto nbytes_to_keep =
1226  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1227  kept_row_start_offset;
1228  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1229  if (nbytes_to_keep > 0) {
1230  CHECK(data_addr);
1231  // move curr varlen row block toward front
1232  memmove(data_addr + ibyte_var_data_to_keep,
1233  data_addr + kept_row_start_offset,
1234  nbytes_to_keep);
1235  }
1236 
1237  const auto base_offset = kept_row_start_offset;
1238  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1239  auto update_index = irow_of_blk_to_keep + i;
1240  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1241  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1242  }
1243  }
1244  nbytes_var_data_to_keep += nbytes_to_keep;
1245  maddr_to_vacuum = indices_addr;
1246 
1247  constexpr static auto index_element_size = sizeof(StringOffsetT);
1248  nbytes_to_keep = nrows_to_keep * index_element_size;
1249  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1250  // move curr fixlen row block toward front
1251  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1252  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1253  nbytes_to_keep);
1254  }
1255  irow_of_blk_to_fill += nrows_to_keep;
1256  nbytes_fix_data_to_keep += nbytes_to_keep;
1257  }
1258  irow_of_blk_to_keep = irow_to_vacuum + 1;
1259  }
1260 
1261  // Set expected null padding, last offset, and negative values for null array offsets.
1262  index_array[0] = null_padding;
1263  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1264  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1265  if (!is_varlen_array) {
1266  CHECK(null_array_indexes.empty());
1267  }
1268  for (auto index : null_array_indexes) {
1269  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1270  }
1271  return nbytes_var_data_to_keep;
1272 }
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
int32_t StringOffsetT
Definition: sqltypes.h:1113
#define CHECK(condition)
Definition: Logger.h:222
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 192 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 194 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 198 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 215 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 209 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

heavyai::shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 211 of file InsertOrderFragmenter.h.

Referenced by updateColumnMetadata(), and updateMetadata().

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 196 of file InsertOrderFragmenter.h.

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 217 of file InsertOrderFragmenter.h.

heavyai::shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 213 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 207 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 206 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 202 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 208 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 220 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 205 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 218 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 245 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 216 of file InsertOrderFragmenter.h.

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 219 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: