OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
size_t getNumFragments () override
 returns the number of fragments in a table More...
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insert_data_struct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertChunks (const InsertChunks &insert_chunk) override
 Insert chunks into minimal number of fragments. More...
 
void insertDataNoCheckpoint (InsertData &insert_data_struct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void insertChunksNoCheckpoint (const InsertChunks &insert_chunk) override
 Insert chunks into minimal number of fragments; no locks or checkpoints taken. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateColumnChunkMetadata (const ColumnDescriptor *cd, const int fragment_id, const std::shared_ptr< ChunkMetadata > metadata) override
 Updates the metadata for a column chunk. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map, std::optional< Data_Namespace::MemoryLevel > memory_level) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
std::optional< ChunkUpdateStatsupdateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
void resetSizesFromFragments () override
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void conditionallyInstantiateFileMgrWithParams ()
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insert_data)
 
void insertChunksImpl (const InsertChunks &insert_chunk)
 
void addColumns (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::vector< std::unique_ptr
< Chunk_NS::Chunk > > 
tracked_in_memory_chunks_
 
std::deque< std::unique_ptr
< FragmentInfo > > 
fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
heavyai::shared_mutex fragmentInfoMutex_
 
heavyai::shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Private Member Functions

bool isAddingNewColumns (const InsertData &insert_data) const
 
void dropFragmentsToSizeNoInsertLock (const size_t max_rows)
 
void setLastFragmentVarLenColumnSizes ()
 
void insertChunksIntoFragment (const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, std::vector< size_t > &valid_row_indices, const size_t start_fragment)
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 54 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 95 of file InsertOrderFragmenter.cpp.

95 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::addColumns ( const InsertData insertDataStruct)
protected

Definition at line 499 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

499  {
500  // synchronize concurrent accesses to fragmentInfoVec_
502  size_t numRowsLeft = insertDataStruct.numRows;
503  for (const auto columnId : insertDataStruct.columnIds) {
504  CHECK(columnMap_.end() == columnMap_.find(columnId));
505  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
506  CHECK(columnDesc);
507  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
508  }
509  try {
510  for (auto const& fragmentInfo : fragmentInfoVec_) {
511  fragmentInfo->shadowChunkMetadataMap =
512  fragmentInfo->getChunkMetadataMapPhysicalCopy();
513  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
514  size_t numRowsCanBeInserted;
515  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
516  auto columnId = insertDataStruct.columnIds[i];
517  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
518  CHECK(colDesc);
519  CHECK(columnMap_.find(columnId) != columnMap_.end());
520 
521  ChunkKey chunkKey = chunkKeyPrefix_;
522  chunkKey.push_back(columnId);
523  chunkKey.push_back(fragmentInfo->fragmentId);
524 
525  auto colMapIt = columnMap_.find(columnId);
526  auto& chunk = colMapIt->second;
527  if (chunk.isChunkOnDevice(
528  dataMgr_,
529  chunkKey,
531  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
532  dataMgr_->deleteChunksWithPrefix(chunkKey);
533  }
534  chunk.createChunkBuffer(
535  dataMgr_,
536  chunkKey,
538  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
539  chunk.initEncoder();
540 
541  try {
542  DataBlockPtr dataCopy = insertDataStruct.data[i];
543  auto size = colDesc->columnType.get_size();
544  if (0 > size) {
545  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
546  varLenColInfo_[columnId] = 0;
547  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
548  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
549  } else {
550  numRowsCanBeInserted = maxChunkSize_ / size;
551  }
552 
553  // FIXME: abort a case in which new column is wider than existing columns
554  if (numRowsCanBeInserted < numRowsToInsert) {
555  throw std::runtime_error("new column '" + colDesc->columnName +
556  "' wider than existing columns is not supported");
557  }
558 
559  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
560  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
561 
562  // update total size of var-len column in (actually the last) fragment
563  if (0 > size) {
564  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
565  varLenColInfo_[columnId] = chunk.getBuffer()->size();
566  }
567  } catch (...) {
568  dataMgr_->deleteChunksWithPrefix(chunkKey);
569  throw;
570  }
571  }
572  numRowsLeft -= numRowsToInsert;
573  }
574  CHECK(0 == numRowsLeft);
575  } catch (const std::exception& e) {
576  for (const auto columnId : insertDataStruct.columnIds) {
577  columnMap_.erase(columnId);
578  }
579  throw e;
580  }
581 
582  for (auto const& fragmentInfo : fragmentInfoVec_) {
583  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
584  }
585 }
std::vector< int > ChunkKey
Definition: types.h:36
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:492
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1278 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), getChunksForAllColumns(), getFragmentInfo(), Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), UpdelRoll::setNumTuple(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1283  {
1284  auto fragment_ptr = getFragmentInfo(fragment_id);
1285  auto& fragment = *fragment_ptr;
1286  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1287  const auto ncol = chunks.size();
1288 
1289  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1290 
1291  // parallel delete columns
1292  std::vector<std::future<void>> threads;
1293  auto nrows_to_vacuum = frag_offsets.size();
1294  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1295  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1296 
1297  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1298  auto chunk = chunks[ci];
1299  const auto cd = chunk->getColumnDesc();
1300  const auto& col_type = cd->columnType;
1301  auto data_buffer = chunk->getBuffer();
1302  auto index_buffer = chunk->getIndexBuf();
1303  auto data_addr = data_buffer->getMemoryPtr();
1304  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1305  auto index_array = (StringOffsetT*)indices_addr;
1306  bool is_varlen = col_type.is_varlen_indeed();
1307 
1308  auto fixlen_vacuum =
1309  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1310  size_t nbytes_fix_data_to_keep;
1311  if (nrows_to_keep == 0) {
1312  nbytes_fix_data_to_keep = 0;
1313  } else {
1314  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1315  }
1316 
1317  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1318  data_buffer->setSize(nbytes_fix_data_to_keep);
1319  data_buffer->setUpdated();
1320 
1321  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1322 
1323  auto daddr = data_addr;
1324  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1325  : get_element_size(col_type);
1326  data_buffer->getEncoder()->resetChunkStats();
1327  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1328  if (col_type.is_fixlen_array()) {
1329  auto encoder =
1330  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1331  CHECK(encoder);
1332  encoder->updateMetadata((int8_t*)daddr);
1333  } else if (col_type.is_fp()) {
1334  set_chunk_stats(col_type,
1335  daddr,
1336  update_stats_per_thread[ci].new_values_stats.has_null,
1337  update_stats_per_thread[ci].new_values_stats.min_double,
1338  update_stats_per_thread[ci].new_values_stats.max_double);
1339  } else {
1340  set_chunk_stats(col_type,
1341  daddr,
1342  update_stats_per_thread[ci].new_values_stats.has_null,
1343  update_stats_per_thread[ci].new_values_stats.min_int64t,
1344  update_stats_per_thread[ci].new_values_stats.max_int64t);
1345  }
1346  }
1347  };
1348 
1349  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1350  size_t nbytes_var_data_to_keep;
1351  if (nrows_to_keep == 0) {
1352  nbytes_var_data_to_keep = 0;
1353  } else {
1354  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1355  }
1356 
1357  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1358  data_buffer->setSize(nbytes_var_data_to_keep);
1359  data_buffer->setUpdated();
1360 
1361  index_buffer->setSize(sizeof(*index_array) *
1362  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1363  index_buffer->setUpdated();
1364 
1365  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1366  };
1367 
1368  if (is_varlen) {
1369  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1370  } else {
1371  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1372  }
1373  if (threads.size() >= (size_t)cpu_threads()) {
1374  wait_cleanup_threads(threads);
1375  }
1376  }
1377 
1378  wait_cleanup_threads(threads);
1379 
1380  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1381  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1382  auto chunk = chunks[ci];
1383  auto cd = chunk->getColumnDesc();
1384  if (!cd->columnType.is_fixlen_array()) {
1385  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1386  // stored in seconds. Do the metadata conversion here before updating the chunk
1387  // stats.
1388  if (cd->columnType.is_date_in_days()) {
1389  auto& stats = update_stats_per_thread[ci].new_values_stats;
1390  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1391  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1392  }
1394  fragment,
1395  chunk,
1396  update_stats_per_thread[ci].new_values_stats,
1397  cd->columnType,
1398  updel_roll);
1399  }
1400  }
1401 }
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:1224
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
#define CHECK(condition)
Definition: Logger.h:222
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
int cpu_threads()
Definition: thread_count.h:25
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams ( )
protected

Definition at line 148 of file InsertOrderFragmenter.cpp.

References catalog_(), Data_Namespace::DISK_LEVEL, File_Namespace::FileMgrParams::max_rollback_epochs, and TableDescriptor::maxRollbackEpochs.

148  {
149  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
150  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
151  // storage per table
152  if (!uses_foreign_storage_ &&
154  const TableDescriptor* td =
155  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
156  File_Namespace::FileMgrParams fileMgrParams;
157  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
159  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
160  }
161 }
int32_t maxRollbackEpochs
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:606
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 936 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::CPU_LEVEL, and Fragmenter_Namespace::FragmentInfo::fragmentId.

937  {
938  // also sets the new fragment as the insertBuffer for each column
939 
940  maxFragmentId_++;
941  auto newFragmentInfo = std::make_unique<FragmentInfo>();
942  newFragmentInfo->fragmentId = maxFragmentId_;
943  newFragmentInfo->shadowNumTuples = 0;
944  newFragmentInfo->setPhysicalNumTuples(0);
945  for (const auto levelSize : dataMgr_->levelSizes_) {
946  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
947  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
948  }
949  newFragmentInfo->physicalTableId = physicalTableId_;
950  newFragmentInfo->shard = shard_;
951 
952  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
953  colMapIt != columnMap_.end();
954  ++colMapIt) {
955  auto& chunk = colMapIt->second;
956  if (memoryLevel == Data_Namespace::MemoryLevel::CPU_LEVEL) {
957  /* At the end of this function chunks from the previous fragment become 'rolled
958  * off', temporaray tables will lose reference to any 'rolled off' chunks and are
959  * not able to unpin these chunks. Keep reference to 'rolled off' chunks and unpin
960  * at ~InsertOrderFragmenter, chunks wrapped by unique_ptr to avoid extraneous
961  * ~Chunk calls with temporary chunks.*/
962  tracked_in_memory_chunks_.emplace_back(std::make_unique<Chunk_NS::Chunk>(chunk));
963  }
964  ChunkKey chunkKey = chunkKeyPrefix_;
965  chunkKey.push_back(chunk.getColumnDesc()->columnId);
966  chunkKey.push_back(maxFragmentId_);
967  chunk.createChunkBuffer(dataMgr_,
968  chunkKey,
969  memoryLevel,
970  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
971  pageSize_);
972  chunk.initEncoder();
973  }
974 
976  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
977  return fragmentInfoVec_.back().get();
978 }
std::lock_guard< T > lock_guard
std::vector< int > ChunkKey
Definition: types.h:36
std::vector< int > levelSizes_
Definition: DataMgr.h:229
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::vector< std::unique_ptr< Chunk_NS::Chunk > > tracked_in_memory_chunks_
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 269 of file InsertOrderFragmenter.cpp.

References catalog_(), and lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

269  {
270  // Fix a verified loophole on sharded logical table which is locked using logical
271  // tableId while it's its physical tables that can come here when fragments overflow
272  // during COPY. Locks on a logical table and its physical tables never intersect, which
273  // means potential races. It'll be an overkill to resolve a logical table to physical
274  // tables in DBHandler, ParseNode or other higher layers where the logical table is
275  // locked with Table Read/Write locks; it's easier to lock the logical table of its
276  // physical tables. A downside of this approach may be loss of parallel execution of
277  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
278  // operation, the loss seems not a big deal.
279  auto chunkKeyPrefix = chunkKeyPrefix_;
280  if (shard_ >= 0) {
281  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
282  }
283 
284  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
285  // SELECT and COPY may enter a deadlock
286  const auto delete_lock =
288 
290 
291  for (const auto fragId : dropFragIds) {
292  for (const auto& col : columnMap_) {
293  int colId = col.first;
294  vector<int> fragPrefix = chunkKeyPrefix_;
295  fragPrefix.push_back(colId);
296  fragPrefix.push_back(fragId);
297  dataMgr_->deleteChunksWithPrefix(fragPrefix);
298  }
299  }
300 }
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
std::unique_lock< T > unique_lock
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4769
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:492
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 587 of file InsertOrderFragmenter.cpp.

587  {
588  // prevent concurrent insert rows and drop column
590  // synchronize concurrent accesses to fragmentInfoVec_
592  for (auto const& fragmentInfo : fragmentInfoVec_) {
593  fragmentInfo->shadowChunkMetadataMap =
594  fragmentInfo->getChunkMetadataMapPhysicalCopy();
595  }
596 
597  for (const auto columnId : columnIds) {
598  auto cit = columnMap_.find(columnId);
599  if (columnMap_.end() != cit) {
600  columnMap_.erase(cit);
601  }
602 
603  vector<int> fragPrefix = chunkKeyPrefix_;
604  fragPrefix.push_back(columnId);
605  dataMgr_->deleteChunksWithPrefix(fragPrefix);
606 
607  for (const auto& fragmentInfo : fragmentInfoVec_) {
608  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
609  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
610  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
611  }
612  }
613  }
614  for (const auto& fragmentInfo : fragmentInfoVec_) {
615  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
616  }
617 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unique_lock< T > unique_lock
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:492
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 236 of file InsertOrderFragmenter.cpp.

236  {
239 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
std::unique_lock< T > unique_lock
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock ( const size_t  max_rows)
private

Definition at line 241 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

241  {
242  // not safe to call from outside insertData
243  // b/c depends on insertLock around numTuples_
244 
245  // don't ever drop the only fragment!
246  if (fragmentInfoVec_.empty() ||
247  numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
248  return;
249  }
250 
251  if (numTuples_ > max_rows) {
252  size_t preNumTuples = numTuples_;
253  vector<int> dropFragIds;
254  size_t targetRows = max_rows * DROP_FRAGMENT_FACTOR;
255  while (numTuples_ > targetRows) {
256  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
257  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
258  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
259  fragmentInfoVec_.pop_front();
260  CHECK_GE(numTuples_, numFragTuples);
261  numTuples_ -= numFragTuples;
262  }
263  deleteFragments(dropFragIds);
264  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
265  << " post: " << numTuples_ << " maxRows: " << max_rows;
266  }
267 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:216
#define CHECK_GE(x, y)
Definition: Logger.h:235
#define CHECK_GT(x, y)
Definition: Logger.h:234
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 120 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 163 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GE, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, gpu_enabled::sort(), and to_string().

163  {
164  if (uses_foreign_storage_ ||
166  // memory-resident tables won't have anything on disk
167  ChunkMetadataVector chunk_metadata;
169 
170  // data comes like this - database_id, table_id, column_id, fragment_id
171  // but lets sort by database_id, table_id, fragment_id, column_id
172 
173  int fragment_subkey_index = 3;
174  std::sort(chunk_metadata.begin(),
175  chunk_metadata.end(),
176  [&](const auto& pair1, const auto& pair2) {
177  return pair1.first[3] < pair2.first[3];
178  });
179 
180  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
181  ++chunk_itr) {
182  int cur_column_id = chunk_itr->first[2];
183  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
184 
185  if (fragmentInfoVec_.empty() ||
186  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
187  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
188  CHECK(new_fragment_info);
189  maxFragmentId_ = cur_fragment_id;
190  new_fragment_info->fragmentId = cur_fragment_id;
191  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
192  numTuples_ += new_fragment_info->getPhysicalNumTuples();
193  for (const auto level_size : dataMgr_->levelSizes_) {
194  new_fragment_info->deviceIds.push_back(
195  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
196  }
197  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
198  new_fragment_info->physicalTableId = physicalTableId_;
199  new_fragment_info->shard = shard_;
200  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
201  } else {
202  if (chunk_itr->second->numElements !=
203  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
204  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
205  std::to_string(physicalTableId_) + ", Column " +
206  std::to_string(cur_column_id) + ". Fragment Tuples: " +
208  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
209  ", Chunk Tuples: " +
210  std::to_string(chunk_itr->second->numElements);
211  }
212  }
213  CHECK(fragmentInfoVec_.back().get());
214  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
215  }
216  }
217 
218  size_t maxFixedColSize = 0;
219 
220  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
221  auto size = colIt->second.getColumnDesc()->columnType.get_size();
222  if (size == -1) { // variable length
223  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
224  size = 8; // b/c we use this for string and array indices - gross to have magic
225  // number here
226  }
227  CHECK_GE(size, 0);
228  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
229  }
230 
231  // this is maximum number of rows assuming everything is fixed length
232  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
234 }
std::vector< int > levelSizes_
Definition: DataMgr.h:229
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:216
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:235
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:466
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 989 of file UpdelStorage.cpp.

References catalog_, CHECK, Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

992  {
993  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
994  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
995  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
996  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
997  ++ncol;
998  if (!cd->isVirtualCol) {
999  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1000  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1001  ChunkKey chunk_key{
1002  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1003  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1004  &catalog_->getDataMgr(),
1005  chunk_key,
1006  memory_level,
1007  0,
1008  chunk_meta_it->second->numBytes,
1009  chunk_meta_it->second->numElements);
1010  chunks.push_back(chunk);
1011  }
1012  }
1013  }
1014  return chunks;
1015 }
std::vector< int > ChunkKey
Definition: types.h:36
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:222
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 119 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

119 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 124 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 406 of file InsertOrderFragmenter.cpp.

References CHECK.

Referenced by compactRows(), updateColumn(), and updateColumns().

406  {
407  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
408  fragmentInfoVec_.end(),
409  [fragment_id](const auto& fragment) -> bool {
410  return fragment->fragmentId == fragment_id;
411  });
412  CHECK(fragment_it != fragmentInfoVec_.end());
413  return fragment_it->get();
414 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the caller graph for this function:

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected
TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 985 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

985  {
987  TableInfo queryInfo;
988  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
989  // right now we don't test predicate, so just return (copy of) all fragments
990  bool fragmentsExist = false;
991  if (fragmentInfoVec_.empty()) {
992  // If we have no fragments add a dummy empty fragment to make the executor
993  // not have separate logic for 0-row tables
994  int maxFragmentId = 0;
995  FragmentInfo emptyFragmentInfo;
996  emptyFragmentInfo.fragmentId = maxFragmentId;
997  emptyFragmentInfo.shadowNumTuples = 0;
998  emptyFragmentInfo.setPhysicalNumTuples(0);
999  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
1000  emptyFragmentInfo.physicalTableId = physicalTableId_;
1001  emptyFragmentInfo.shard = shard_;
1002  queryInfo.fragments.push_back(emptyFragmentInfo);
1003  } else {
1004  fragmentsExist = true;
1005  std::for_each(
1006  fragmentInfoVec_.begin(),
1007  fragmentInfoVec_.end(),
1008  [&queryInfo](const auto& fragment_owned_ptr) {
1009  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
1010  });
1011  }
1012  readLock.unlock();
1013  queryInfo.setPhysicalNumTuples(0);
1014  auto partIt = queryInfo.fragments.begin();
1015  if (fragmentsExist) {
1016  while (partIt != queryInfo.fragments.end()) {
1017  if (partIt->getPhysicalNumTuples() == 0) {
1018  // this means that a concurrent insert query inserted tuples into a new fragment
1019  // but when the query came in we didn't have this fragment. To make sure we
1020  // don't mess up the executor we delete this fragment from the metadatamap
1021  // (fixes earlier bug found 2015-05-08)
1022  partIt = queryInfo.fragments.erase(partIt);
1023  } else {
1024  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
1025  partIt->getPhysicalNumTuples());
1026  ++partIt;
1027  }
1028  }
1029  } else {
1030  // We added a dummy fragment and know the table is empty
1031  queryInfo.setPhysicalNumTuples(0);
1032  }
1033  return queryInfo;
1034 }
std::vector< int > levelSizes_
Definition: DataMgr.h:229
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumFragments ( )
overridevirtual

returns the number of fragments in a table

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 980 of file InsertOrderFragmenter.cpp.

980  {
982  return fragmentInfoVec_.size();
983 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1018 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1019  {
1020  const auto data_buffer = chunk->getBuffer();
1021  const auto data_addr = data_buffer->getMemoryPtr();
1022  const size_t nrows_in_chunk = data_buffer->size();
1023  const size_t ncore = cpu_threads();
1024  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1025  std::vector<std::vector<uint64_t>> deleted_offsets;
1026  deleted_offsets.resize(ncore);
1027  std::vector<std::future<void>> threads;
1028  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1029  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1030  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1031  const auto ithread = rbegin / segsz;
1032  CHECK(ithread < deleted_offsets.size());
1033  deleted_offsets[ithread].reserve(segsz);
1034  for (size_t r = rbegin; r < rend; ++r) {
1035  if (data_addr[r]) {
1036  deleted_offsets[ithread].push_back(r);
1037  }
1038  }
1039  }));
1040  }
1041  wait_cleanup_threads(threads);
1042  std::vector<uint64_t> all_deleted_offsets;
1043  for (size_t i = 0; i < ncore; ++i) {
1044  all_deleted_offsets.insert(
1045  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1046  }
1047  return all_deleted_offsets;
1048 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
#define CHECK(condition)
Definition: Logger.h:222
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 619 of file InsertOrderFragmenter.cpp.

References CHECK.

619  {
621 
622  for (auto const& fragment : fragmentInfoVec_) {
623  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
624  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
625  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
626  if (chunk_stats.max.tinyintval == 1) {
627  return true;
628  }
629  }
630  return false;
631 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
#define CHECK(condition)
Definition: Logger.h:222
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunks ( const InsertChunks insert_chunk)
overridevirtual

Insert chunks into minimal number of fragments.

Parameters
insert_chunk- the chunks to insert

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 431 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertChunks::db_id, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertChunks::table_id.

431  {
432  try {
433  // prevent two threads from trying to insert into the same table simultaneously
435  insertChunksImpl(insert_chunk);
436  if (defaultInsertLevel_ ==
437  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
439  chunkKeyPrefix_[0],
440  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
441  }
442  } catch (...) {
443  auto db_id = insert_chunk.db_id;
444  auto table_epochs = catalog_->getTableEpochs(db_id, insert_chunk.table_id);
445  // the statement below deletes *this* object!
446  // relying on exception propagation at this stage
447  // until we can sort this out in a cleaner fashion
448  catalog_->setTableEpochs(db_id, table_epochs);
449  throw;
450  }
451 }
void insertChunksImpl(const InsertChunks &insert_chunk)
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:545
std::unique_lock< T > unique_lock
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3613
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3585

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksImpl ( const InsertChunks insert_chunk)
protected

Definition at line 703 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_EQ, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertChunks::chunks, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::get_num_rows_to_insert(), and Fragmenter_Namespace::InsertChunks::valid_row_indices.

703  {
704  std::optional<int> delete_column_id{std::nullopt};
705  for (const auto& cit : columnMap_) {
706  if (cit.second.getColumnDesc()->isDeletedCol) {
707  delete_column_id = cit.second.getColumnDesc()->columnId;
708  }
709  }
710 
711  // verify that all chunks to be inserted have same number of rows, otherwise the input
712  // data is malformed
713  std::optional<size_t> num_rows{std::nullopt};
714  for (const auto& [column_id, chunk] : insert_chunks.chunks) {
715  auto buffer = chunk->getBuffer();
716  CHECK(buffer);
717  CHECK(buffer->hasEncoder());
718  if (!num_rows.has_value()) {
719  num_rows = buffer->getEncoder()->getNumElems();
720  } else {
721  CHECK_EQ(num_rows.value(), buffer->getEncoder()->getNumElems());
722  }
723  }
724 
725  auto valid_row_indices = insert_chunks.valid_row_indices;
726  size_t num_rows_left = valid_row_indices.size();
727  size_t num_rows_inserted = 0;
728 
729  if (num_rows_left == 0) {
730  return;
731  }
732 
733  FragmentInfo* current_fragment{nullptr};
734 
735  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
736  // feels fragile
737  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
738  current_fragment = createNewFragment(defaultInsertLevel_);
739  } else {
740  current_fragment = fragmentInfoVec_.back().get();
741  }
742  CHECK(current_fragment);
743 
744  size_t start_fragment = fragmentInfoVec_.size() - 1;
745 
746  while (num_rows_left > 0) { // may have to create multiple fragments for bulk insert
747  // loop until done inserting all rows
748  CHECK_LE(current_fragment->shadowNumTuples, maxFragmentRows_);
749  size_t rows_left_in_current_fragment =
750  maxFragmentRows_ - current_fragment->shadowNumTuples;
751  size_t num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
752  num_rows_left,
753  num_rows_inserted,
756  insert_chunks,
757  columnMap_,
758  valid_row_indices);
759 
760  if (rows_left_in_current_fragment == 0 || num_rows_to_insert == 0) {
761  current_fragment = createNewFragment(defaultInsertLevel_);
762  if (num_rows_inserted == 0) {
763  start_fragment++;
764  }
765  rows_left_in_current_fragment = maxFragmentRows_;
766  for (auto& varLenColInfoIt : varLenColInfo_) {
767  varLenColInfoIt.second = 0; // reset byte counter
768  }
769  num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
770  num_rows_left,
771  num_rows_inserted,
772  varLenColInfo_,
774  insert_chunks,
775  columnMap_,
776  valid_row_indices);
777  }
778 
779  CHECK_GT(num_rows_to_insert, size_t(0)); // would put us into an endless loop as we'd
780  // never be able to insert anything
781 
782  insertChunksIntoFragment(insert_chunks,
783  delete_column_id,
784  current_fragment,
785  num_rows_to_insert,
786  num_rows_inserted,
787  num_rows_left,
788  valid_row_indices,
789  start_fragment);
790  }
791  numTuples_ += *num_rows;
793 }
void insertChunksIntoFragment(const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, std::vector< size_t > &valid_row_indices, const size_t start_fragment)
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:234
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:233
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment, const size_t num_rows_left, const size_t num_rows_inserted, const std::unordered_map< int, size_t > &var_len_col_info, const size_t max_chunk_size, const InsertChunks &insert_chunks, std::map< int, Chunk_NS::Chunk > &column_map, const std::vector< size_t > &valid_row_indices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksIntoFragment ( const InsertChunks insert_chunks,
const std::optional< int >  delete_column_id,
FragmentInfo current_fragment,
const size_t  num_rows_to_insert,
size_t &  num_rows_inserted,
size_t &  num_rows_left,
std::vector< size_t > &  valid_row_indices,
const size_t  start_fragment 
)
private

Definition at line 633 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_EQ, CHECK_GE, CHECK_LE, Fragmenter_Namespace::InsertChunks::chunks, Fragmenter_Namespace::FragmentInfo::fragmentId, DataBlockPtr::numbersPtr, Fragmenter_Namespace::FragmentInfo::shadowChunkMetadataMap, and Fragmenter_Namespace::FragmentInfo::shadowNumTuples.

641  {
643  // for each column, append the data in the appropriate insert buffer
644  auto insert_row_indices = valid_row_indices;
645  CHECK_GE(insert_row_indices.size(), num_rows_to_insert);
646  insert_row_indices.erase(insert_row_indices.begin() + num_rows_to_insert,
647  insert_row_indices.end());
648  CHECK_EQ(insert_row_indices.size(), num_rows_to_insert);
649  for (auto& [column_id, chunk] : insert_chunks.chunks) {
650  auto col_map_it = columnMap_.find(column_id);
651  CHECK(col_map_it != columnMap_.end());
652  current_fragment->shadowChunkMetadataMap[column_id] =
653  col_map_it->second.appendEncodedDataAtIndices(*chunk, insert_row_indices);
654  auto var_len_col_info_it = varLenColInfo_.find(column_id);
655  if (var_len_col_info_it != varLenColInfo_.end()) {
656  var_len_col_info_it->second = col_map_it->second.getBuffer()->size();
657  CHECK_LE(var_len_col_info_it->second, maxChunkSize_);
658  }
659  }
660  if (hasMaterializedRowId_) {
661  size_t start_id = maxFragmentRows_ * current_fragment->fragmentId +
662  current_fragment->shadowNumTuples;
663  std::vector<int64_t> row_id_data(num_rows_to_insert);
664  for (size_t i = 0; i < num_rows_to_insert; ++i) {
665  row_id_data[i] = i + start_id;
666  }
667  DataBlockPtr row_id_block;
668  row_id_block.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.data());
669  auto col_map_it = columnMap_.find(rowIdColId_);
670  CHECK(col_map_it != columnMap_.end());
671  current_fragment->shadowChunkMetadataMap[rowIdColId_] = col_map_it->second.appendData(
672  row_id_block, num_rows_to_insert, num_rows_inserted);
673  }
674 
675  if (delete_column_id) { // has delete column
676  std::vector<int8_t> delete_data(num_rows_to_insert, false);
677  DataBlockPtr delete_block;
678  delete_block.numbersPtr = reinterpret_cast<int8_t*>(delete_data.data());
679  auto col_map_it = columnMap_.find(*delete_column_id);
680  CHECK(col_map_it != columnMap_.end());
681  current_fragment->shadowChunkMetadataMap[*delete_column_id] =
682  col_map_it->second.appendData(
683  delete_block, num_rows_to_insert, num_rows_inserted);
684  }
685 
686  current_fragment->shadowNumTuples =
687  fragmentInfoVec_.back()->getPhysicalNumTuples() + num_rows_to_insert;
688  num_rows_left -= num_rows_to_insert;
689  num_rows_inserted += num_rows_to_insert;
690  for (auto part_it = fragmentInfoVec_.begin() + start_fragment;
691  part_it != fragmentInfoVec_.end();
692  ++part_it) {
693  auto fragment_ptr = part_it->get();
694  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
695  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
696  }
697 
698  // truncate the first `num_rows_to_insert` rows in `valid_row_indices`
699  valid_row_indices.erase(valid_row_indices.begin(),
700  valid_row_indices.begin() + num_rows_to_insert);
701 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GE(x, y)
Definition: Logger.h:235
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::unique_lock< T > unique_lock
#define CHECK_LE(x, y)
Definition: Logger.h:233
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:221
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksNoCheckpoint ( const InsertChunks insert_chunk)
overridevirtual

Insert chunks into minimal number of fragments; no locks or checkpoints taken.

Parameters
chunk- the chunks to insert

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 479 of file InsertOrderFragmenter.cpp.

479  {
480  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
482  insertMutex_); // prevent two threads from trying to insert into the same table
483  // simultaneously
484  insertChunksImpl(insert_chunk);
485 }
void insertChunksImpl(const InsertChunks &insert_chunk)
std::unique_lock< T > unique_lock
void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insert_data_struct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 453 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

453  {
454  try {
455  // prevent two threads from trying to insert into the same table simultaneously
457  if (!isAddingNewColumns(insert_data_struct)) {
458  insertDataImpl(insert_data_struct);
459  } else {
460  addColumns(insert_data_struct);
461  }
462  if (defaultInsertLevel_ ==
463  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
465  chunkKeyPrefix_[0],
466  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
467  }
468  } catch (...) {
469  auto table_epochs = catalog_->getTableEpochs(insert_data_struct.databaseId,
470  insert_data_struct.tableId);
471  // the statement below deletes *this* object!
472  // relying on exception propagation at this stage
473  // until we can sort this out in a cleaner fashion
474  catalog_->setTableEpochs(insert_data_struct.databaseId, table_epochs);
475  throw;
476  }
477 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:545
void addColumns(const InsertData &insertDataStruct)
std::unique_lock< T > unique_lock
bool isAddingNewColumns(const InsertData &insert_data) const
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3613
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3585

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insert_data)
protected

Definition at line 795 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::is_default, DataBlockPtr::numbersPtr, and Fragmenter_Namespace::InsertData::numRows.

795  {
796  // populate deleted system column if it should exist, as it will not come from client
797  std::unique_ptr<int8_t[]> data_for_deleted_column;
798  for (const auto& cit : columnMap_) {
799  if (cit.second.getColumnDesc()->isDeletedCol) {
800  data_for_deleted_column.reset(new int8_t[insert_data.numRows]);
801  memset(data_for_deleted_column.get(), 0, insert_data.numRows);
802  insert_data.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
803  insert_data.columnIds.push_back(cit.second.getColumnDesc()->columnId);
804  insert_data.is_default.push_back(false);
805  break;
806  }
807  }
808  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
809  std::unordered_map<int, int> inverseInsertDataColIdMap;
810  for (size_t insertId = 0; insertId < insert_data.columnIds.size(); ++insertId) {
811  inverseInsertDataColIdMap.insert(
812  std::make_pair(insert_data.columnIds[insertId], insertId));
813  }
814 
815  size_t numRowsLeft = insert_data.numRows;
816  size_t numRowsInserted = 0;
817  vector<DataBlockPtr> dataCopy =
818  insert_data.data; // bc append data will move ptr forward and this violates
819  // constness of InsertData
820  if (numRowsLeft <= 0) {
821  return;
822  }
823 
824  FragmentInfo* currentFragment{nullptr};
825 
826  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
827  // feels fragile
828  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
829  currentFragment = createNewFragment(defaultInsertLevel_);
830  } else {
831  currentFragment = fragmentInfoVec_.back().get();
832  }
833  CHECK(currentFragment);
834 
835  size_t startFragment = fragmentInfoVec_.size() - 1;
836 
837  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
838  // loop until done inserting all rows
839  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
840  size_t rowsLeftInCurrentFragment =
841  maxFragmentRows_ - currentFragment->shadowNumTuples;
842  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
843  if (rowsLeftInCurrentFragment != 0) {
844  for (auto& varLenColInfoIt : varLenColInfo_) {
845  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
846  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
847  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
848  if (insertIdIt != inverseInsertDataColIdMap.end()) {
849  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
850  numRowsToInsert = std::min(numRowsToInsert,
851  colMapIt->second.getNumElemsForBytesInsertData(
852  dataCopy[insertIdIt->second],
853  numRowsToInsert,
854  numRowsInserted,
855  bytesLeft,
856  insert_data.is_default[insertIdIt->second]));
857  }
858  }
859  }
860 
861  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
862  currentFragment = createNewFragment(defaultInsertLevel_);
863  if (numRowsInserted == 0) {
864  startFragment++;
865  }
866  rowsLeftInCurrentFragment = maxFragmentRows_;
867  for (auto& varLenColInfoIt : varLenColInfo_) {
868  varLenColInfoIt.second = 0; // reset byte counter
869  }
870  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
871  for (auto& varLenColInfoIt : varLenColInfo_) {
872  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
873  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
874  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
875  if (insertIdIt != inverseInsertDataColIdMap.end()) {
876  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
877  numRowsToInsert = std::min(numRowsToInsert,
878  colMapIt->second.getNumElemsForBytesInsertData(
879  dataCopy[insertIdIt->second],
880  numRowsToInsert,
881  numRowsInserted,
882  bytesLeft,
883  insert_data.is_default[insertIdIt->second]));
884  }
885  }
886  }
887 
888  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
889  // never be able to insert anything
890 
891  {
893  // for each column, append the data in the appropriate insert buffer
894  for (size_t i = 0; i < insert_data.columnIds.size(); ++i) {
895  int columnId = insert_data.columnIds[i];
896  auto colMapIt = columnMap_.find(columnId);
897  CHECK(colMapIt != columnMap_.end());
898  currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
899  dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.is_default[i]);
900  auto varLenColInfoIt = varLenColInfo_.find(columnId);
901  if (varLenColInfoIt != varLenColInfo_.end()) {
902  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
903  }
904  }
905  if (hasMaterializedRowId_) {
906  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
907  currentFragment->shadowNumTuples;
908  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
909  for (size_t i = 0; i < numRowsToInsert; ++i) {
910  row_id_data[i] = i + startId;
911  }
912  DataBlockPtr rowIdBlock;
913  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
914  auto colMapIt = columnMap_.find(rowIdColId_);
915  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
916  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
917  }
918 
919  currentFragment->shadowNumTuples =
920  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
921  numRowsLeft -= numRowsToInsert;
922  numRowsInserted += numRowsToInsert;
923  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
924  partIt != fragmentInfoVec_.end();
925  ++partIt) {
926  auto fragment_ptr = partIt->get();
927  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
928  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
929  }
930  }
931  }
932  numTuples_ += insert_data.numRows;
934 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::unique_lock< T > unique_lock
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:233
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:221
void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insert_data_struct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 487 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

487  {
488  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
490  insertMutex_); // prevent two threads from trying to insert into the same table
491  // simultaneously
492  if (!isAddingNewColumns(insert_data_struct)) {
493  insertDataImpl(insert_data_struct);
494  } else {
495  addColumns(insert_data_struct);
496  }
497 }
void addColumns(const InsertData &insertDataStruct)
std::unique_lock< T > unique_lock
bool isAddingNewColumns(const InsertData &insert_data) const

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::isAddingNewColumns ( const InsertData insert_data) const
private

Definition at line 416 of file InsertOrderFragmenter.cpp.

References CHECK, and Fragmenter_Namespace::InsertData::columnIds.

416  {
417  bool all_columns_already_exist = true, all_columns_are_new = true;
418  for (const auto column_id : insert_data.columnIds) {
419  if (columnMap_.find(column_id) == columnMap_.end()) {
420  all_columns_already_exist = false;
421  } else {
422  all_columns_are_new = false;
423  }
424  }
425  // only one should be TRUE
426  bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
427  CHECK(either_all_exist_or_all_new);
428  return all_columns_are_new;
429 }
#define CHECK(condition)
Definition: Logger.h:222
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::resetSizesFromFragments ( )
overridevirtual

Resets the fragmenter's size related metadata using the internal fragment info vector. This is typically done after operations, such as vacuuming, which can change fragment sizes.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1036 of file InsertOrderFragmenter.cpp.

1036  {
1038  numTuples_ = 0;
1039  for (const auto& fragment_info : fragmentInfoVec_) {
1040  numTuples_ += fragment_info->getPhysicalNumTuples();
1041  }
1043 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
void Fragmenter_Namespace::InsertOrderFragmenter::setLastFragmentVarLenColumnSizes ( )
private

Definition at line 1045 of file InsertOrderFragmenter.cpp.

1045  {
1046  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
1047  // Now need to get the insert buffers for each column - should be last
1048  // fragment
1049  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
1050  // TODO: add accessor here for safe indexing
1051  int deviceId =
1052  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
1053  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
1054  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
1055  insertKey.push_back(colIt->first); // column id
1056  insertKey.push_back(lastFragmentId); // fragment id
1057  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
1058  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
1059  if (varLenColInfoIt != varLenColInfo_.end()) {
1060  varLenColInfoIt->second = colIt->second.getBuffer()->size();
1061  }
1062  }
1063  }
1064 }
std::vector< int > ChunkKey
Definition: types.h:36
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map,
std::optional< Data_Namespace::MemoryLevel memory_level 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 315 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, show_chunk(), VLOG, and logger::WARNING.

318  {
319  // synchronize concurrent accesses to fragmentInfoVec_
326  if (shard_ >= 0) {
327  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
328  }
329 
330  CHECK(cd);
331  const auto column_id = cd->columnId;
332  const auto col_itr = columnMap_.find(column_id);
333  CHECK(col_itr != columnMap_.end());
334 
335  for (auto const& fragment : fragmentInfoVec_) {
336  auto stats_itr = stats_map.find(fragment->fragmentId);
337  if (stats_itr != stats_map.end()) {
338  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
339  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
340  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
342  column_id,
343  fragment->fragmentId};
344  auto chunk = Chunk_NS::Chunk::getChunk(cd,
345  &catalog_->getDataMgr(),
346  chunk_key,
347  memory_level.value_or(defaultInsertLevel_),
348  0,
349  chunk_meta_it->second->numBytes,
350  chunk_meta_it->second->numElements);
351  auto buf = chunk->getBuffer();
352  CHECK(buf);
353  if (!buf->hasEncoder()) {
354  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
355  }
356  auto encoder = buf->getEncoder();
357 
358  auto chunk_stats = stats_itr->second;
359 
360  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
361  encoder->getMetadata(old_chunk_metadata);
362  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
363 
364  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
365  // Use the logical type to display data, since the encoding should be ignored
366  const auto logical_ti = cd->columnType.is_dict_encoded_string()
368  : get_logical_type_info(cd->columnType);
369  if (!didResetStats) {
370  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
371  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
372  << DatumToString(chunk_stats.max, logical_ti);
373  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
374  << DatumToString(chunk_stats.min, logical_ti);
375  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
376  continue; // move to next fragment
377  }
378 
379  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
380  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
381  << DatumToString(chunk_stats.max, logical_ti);
382  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
383  << DatumToString(chunk_stats.min, logical_ti);
384  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
385 
386  // Reset fragment metadata map and set buffer to dirty
387  auto new_metadata = std::make_shared<ChunkMetadata>();
388  // Run through fillChunkStats to ensure any transformations to the raw metadata
389  // values get applied (e.g. for date in days)
390  encoder->getMetadata(new_metadata);
391 
392  fragment->setChunkMetadata(column_id, new_metadata);
393  fragment->shadowChunkMetadataMap =
394  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
396  buf->setDirty();
397  }
398  } else {
399  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
400  << ", table " << physicalTableId_ << ", "
401  << ", column " << column_id;
402  }
403  }
404 }
std::vector< int > ChunkKey
Definition: types.h:36
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:458
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
#define LOG(tag)
Definition: Logger.h:216
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1198
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
std::unique_lock< T > unique_lock
#define CHECK(condition)
Definition: Logger.h:222
bool is_dict_encoded_string() const
Definition: sqltypes.h:627
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:316

+ Here is the call graph for this function:

std::optional< ChunkUpdateStats > Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 640 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, CHECK_GT, Fragmenter_Namespace::ChunkUpdateStats::chunk, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), Fragmenter_Namespace::ChunkUpdateStats::fragment_rows_count, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::ChunkUpdateStats::new_values_stats, Fragmenter_Namespace::ChunkUpdateStats::old_values_stats, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, TableDescriptor::tableId, temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, Fragmenter_Namespace::anonymous_namespace{UpdelStorage.cpp}::update_metadata(), foreign_storage::update_stats(), updateColumnMetadata(), Fragmenter_Namespace::ChunkUpdateStats::updated_rows_count, NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

649  {
650  updel_roll.catalog = catalog;
651  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
652  updel_roll.memoryLevel = memory_level;
653 
654  const size_t ncore = cpu_threads();
655  const auto nrow = frag_offsets.size();
656  const auto n_rhs_values = rhs_values.size();
657  if (0 == nrow) {
658  return {};
659  }
660  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
661 
662  auto fragment_ptr = getFragmentInfo(fragment_id);
663  auto& fragment = *fragment_ptr;
664  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
665  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
666  ChunkKey chunk_key{
667  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
668  auto chunk = Chunk_NS::Chunk::getChunk(cd,
669  &catalog->getDataMgr(),
670  chunk_key,
672  0,
673  chunk_meta_it->second->numBytes,
674  chunk_meta_it->second->numElements);
675 
676  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
677 
678  // parallel update elements
679  std::vector<std::future<void>> threads;
680 
681  const auto segsz = (nrow + ncore - 1) / ncore;
682  auto dbuf = chunk->getBuffer();
683  auto dbuf_addr = dbuf->getMemoryPtr();
684  dbuf->setUpdated();
685  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
686  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
687  threads.emplace_back(std::async(
688  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
689  SQLTypeInfo lhs_type = cd->columnType;
690 
691  // !! not sure if this is a undocumented convention or a bug, but for a sharded
692  // table the dictionary id of a encoded string column is not specified by
693  // comp_param in physical table but somehow in logical table :) comp_param in
694  // physical table is always 0, so need to adapt accordingly...
695  auto cdl = (shard_ < 0)
696  ? cd
697  : catalog->getMetadataForColumn(
698  catalog->getLogicalTableId(td->tableId), cd->columnId);
699  CHECK(cdl);
700  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
701  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
702  lhs_type, &decimalOverflowValidator);
703  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
704  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
705  lhs_type, &dateDaysOverflowValidator);
706 
707  StringDictionary* stringDict{nullptr};
708  if (lhs_type.is_string()) {
709  CHECK(kENCODING_DICT == lhs_type.get_compression());
710  auto dictDesc = const_cast<DictDescriptor*>(
711  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
712  CHECK(dictDesc);
713  stringDict = dictDesc->stringDict.get();
714  CHECK(stringDict);
715  }
716 
717  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
718  const auto roffs = frag_offsets[r];
719  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
720  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
721  ScalarTargetValue sv2;
722 
723  // Subtle here is on the two cases of string-to-string assignments, when
724  // upstream passes RHS string as a string index instead of a preferred "real
725  // string".
726  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
727  // index
728  // in this layer, so if upstream passes a str idx here, an
729  // exception is thrown.
730  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
731  // str idx.
732  if (rhs_type.is_string()) {
733  if (const auto vp = boost::get<int64_t>(sv)) {
734  auto dictDesc = const_cast<DictDescriptor*>(
735  catalog->getMetadataForDict(rhs_type.get_comp_param()));
736  if (nullptr == dictDesc) {
737  throw std::runtime_error(
738  "UPDATE does not support cast from string literal to string "
739  "column.");
740  }
741  auto stringDict = dictDesc->stringDict.get();
742  CHECK(stringDict);
743  sv2 = NullableString(stringDict->getString(*vp));
744  sv = &sv2;
745  }
746  }
747 
748  if (const auto vp = boost::get<int64_t>(sv)) {
749  auto v = *vp;
750  if (lhs_type.is_string()) {
751  throw std::runtime_error("UPDATE does not support cast to string.");
752  }
753  int64_t old_val;
754  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
755  // Handle special case where date column with date in days encoding stores
756  // metadata in epoch seconds.
757  if (lhs_type.is_date_in_days()) {
759  }
760  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
761  if (lhs_type.is_decimal()) {
762  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
763  int64_t decimal_val;
764  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
765  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
766  lhs_type.get_notnull() == false)
767  ? v
768  : decimal_val;
770  lhs_type, update_stats_per_thread[c], target_value, old_val);
771  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
772  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
773  if (positive_v_and_negative_d || negative_v_and_positive_d) {
774  throw std::runtime_error(
775  "Data conversion overflow on " + std::to_string(v) +
776  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
777  std::to_string(rhs_type.get_scale()) + ") to (" +
778  std::to_string(lhs_type.get_dimension()) + ", " +
779  std::to_string(lhs_type.get_scale()) + ")");
780  }
781  } else if (is_integral(lhs_type)) {
782  if (lhs_type.is_date_in_days()) {
783  // Store meta values in seconds
784  if (lhs_type.get_size() == 2) {
785  nullAwareDateOverflowValidator.validate<int16_t>(v);
786  } else {
787  nullAwareDateOverflowValidator.validate<int32_t>(v);
788  }
789  int64_t days;
790  get_scalar<int64_t>(data_ptr, lhs_type, days);
791  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
792  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
793  lhs_type.get_notnull() == false)
794  ? NullSentinelSupplier()(lhs_type, v)
795  : seconds;
797  lhs_type, update_stats_per_thread[c], target_value, old_val);
798  } else {
799  int64_t target_value;
800  if (rhs_type.is_decimal()) {
801  target_value = round(decimal_to_double(rhs_type, v));
802  } else {
803  target_value = v;
804  }
806  lhs_type, update_stats_per_thread[c], target_value, old_val);
807  }
808  } else {
809  if (rhs_type.is_decimal()) {
810  update_metadata(lhs_type,
811  update_stats_per_thread[c],
812  decimal_to_double(rhs_type, v),
813  double(old_val));
814  } else {
815  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
816  }
817  }
818  } else if (const auto vp = boost::get<double>(sv)) {
819  auto v = *vp;
820  if (lhs_type.is_string()) {
821  throw std::runtime_error("UPDATE does not support cast to string.");
822  }
823  double old_val;
824  get_scalar<double>(data_ptr, lhs_type, old_val);
825  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
826  if (lhs_type.is_integer()) {
828  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
829  } else if (lhs_type.is_fp()) {
831  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
832  } else {
833  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
834  "LHS with a floating RHS.";
835  }
836  } else if (const auto vp = boost::get<float>(sv)) {
837  auto v = *vp;
838  if (lhs_type.is_string()) {
839  throw std::runtime_error("UPDATE does not support cast to string.");
840  }
841  float old_val;
842  get_scalar<float>(data_ptr, lhs_type, old_val);
843  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
844  if (lhs_type.is_integer()) {
846  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
847  } else {
848  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
849  }
850  } else if (const auto vp = boost::get<NullableString>(sv)) {
851  const auto s = boost::get<std::string>(vp);
852  const auto sval = s ? *s : std::string("");
853  if (lhs_type.is_string()) {
854  decltype(stringDict->getOrAdd(sval)) sidx;
855  {
856  std::unique_lock<std::mutex> lock(temp_mutex_);
857  sidx = stringDict->getOrAdd(sval);
858  }
859  int64_t old_val;
860  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
861  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
863  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
864  } else if (sval.size() > 0) {
865  auto dval = std::atof(sval.data());
866  if (lhs_type.is_boolean()) {
867  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
868  } else if (lhs_type.is_time()) {
869  throw std::runtime_error(
870  "Date/Time/Timestamp update not supported through translated "
871  "string path.");
872  }
873  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
874  double old_val;
875  get_scalar<double>(data_ptr, lhs_type, old_val);
876  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
878  lhs_type, update_stats_per_thread[c], double(dval), old_val);
879  } else {
880  int64_t old_val;
881  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
882  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
884  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
885  }
886  } else {
887  put_null(data_ptr, lhs_type, cd->columnName);
888  update_stats_per_thread[c].new_values_stats.has_null = true;
889  }
890  } else {
891  CHECK(false);
892  }
893  }
894  }));
895  if (threads.size() >= (size_t)cpu_threads()) {
896  wait_cleanup_threads(threads);
897  }
898  }
899  wait_cleanup_threads(threads);
900 
901  // for unit test
903  if (cd->isDeletedCol) {
904  const auto deleted_offsets = getVacuumOffsets(chunk);
905  if (deleted_offsets.size() > 0) {
906  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
907  return {};
908  }
909  }
910  }
911  ChunkUpdateStats update_stats;
912  for (size_t c = 0; c < ncore; ++c) {
913  update_metadata(update_stats.new_values_stats,
914  update_stats_per_thread[c].new_values_stats);
915  update_metadata(update_stats.old_values_stats,
916  update_stats_per_thread[c].old_values_stats);
917  }
918 
919  CHECK_GT(fragment.shadowNumTuples, size_t(0));
921  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
922  update_stats.updated_rows_count = nrow;
923  update_stats.fragment_rows_count = fragment.shadowNumTuples;
924  update_stats.chunk = chunk;
925  return update_stats;
926 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
std::vector< int > ChunkKey
Definition: types.h:36
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
HOST DEVICE int get_scale() const
Definition: sqltypes.h:384
#define UNREACHABLE()
Definition: Logger.h:266
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4769
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1960
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:387
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:381
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:179
int logicalTableId
Definition: UpdelRoll.h:54
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:388
#define CHECK(condition)
Definition: Logger.h:222
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:575
int cpu_threads()
Definition: thread_count.h:25
bool is_decimal() const
Definition: sqltypes.h:578
std::string columnName
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 55 of file UpdelStorage.cpp.

References updateColumn().

63  {
64  updateColumn(catalog,
65  td,
66  cd,
67  fragment_id,
68  frag_offsets,
69  std::vector<ScalarTargetValue>(1, rhs_value),
70  rhs_type,
71  memory_level,
72  updel_roll);
73 }
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnChunkMetadata ( const ColumnDescriptor cd,
const int  fragment_id,
const std::shared_ptr< ChunkMetadata metadata 
)
overridevirtual

Updates the metadata for a column chunk.

Parameters
cd- ColumnDescriptor for the column
fragment_id- Fragment id of the chunk within the column
metadata- shared_ptr of the metadata to update column chunk with

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 302 of file InsertOrderFragmenter.cpp.

References CHECK, and ColumnDescriptor::columnId.

305  {
306  // synchronize concurrent accesses to fragmentInfoVec_
308 
309  CHECK(metadata.get());
310  auto fragment_info = getFragmentInfo(fragment_id);
311  CHECK(fragment_info);
312  fragment_info->setChunkMetadata(cd->columnId, metadata);
313 }
std::unique_lock< T > unique_lock
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
#define CHECK(condition)
Definition: Logger.h:222
void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const UpdateValuesStats update_values_stats,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 928 of file UpdelStorage.cpp.

References UpdelRoll::catalog, ColumnDescriptor::columnId, ColumnDescriptor::columnType, fragmentInfoMutex_, UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getMetadataForTable(), Fragmenter_Namespace::UpdateValuesStats::has_null, SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, Fragmenter_Namespace::UpdateValuesStats::max_double, Fragmenter_Namespace::UpdateValuesStats::max_int64t, Fragmenter_Namespace::UpdateValuesStats::min_double, Fragmenter_Namespace::UpdateValuesStats::min_int64t, ColumnDescriptor::tableId, and foreign_storage::update_stats().

Referenced by compactRows(), and updateColumn().

934  {
936  auto buffer = chunk->getBuffer();
937  const auto& lhs_type = cd->columnType;
938 
939  auto encoder = buffer->getEncoder();
940  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
941  static_assert(std::is_same<decltype(min), decltype(max)>::value,
942  "Type mismatch on min/max");
943  if (has_null) {
944  encoder->updateStats(decltype(min)(), true);
945  }
946  if (max < min) {
947  return;
948  }
949  encoder->updateStats(min, false);
950  encoder->updateStats(max, false);
951  };
952 
953  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
954  update_stats(new_values_stats.min_int64t,
955  new_values_stats.max_int64t,
956  new_values_stats.has_null);
957  } else if (lhs_type.is_fp()) {
958  update_stats(new_values_stats.min_double,
959  new_values_stats.max_double,
960  new_values_stats.has_null);
961  } else if (lhs_type.is_decimal()) {
962  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
963  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
964  new_values_stats.has_null);
965  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
966  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
967  update_stats(new_values_stats.min_int64t,
968  new_values_stats.max_int64t,
969  new_values_stats.has_null);
970  }
971  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
972  auto chunk_metadata =
973  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
974  buffer->getEncoder()->getMetadata(chunk_metadata);
975 }
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
heavyai::unique_lock< heavyai::shared_mutex > write_lock
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
std::unique_lock< T > unique_lock
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:578
bool is_integral(const SQLTypeInfo &t)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll,
Executor executor 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 268 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, checked_get(), Fragmenter_Namespace::InsertData::columnIds, cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, g_enable_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), Fragmenter_Namespace::InsertData::is_default, SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

278  {
279  updelRoll.is_varlen_update = true;
280  updelRoll.catalog = catalog;
281  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
282  updelRoll.memoryLevel = memoryLevel;
283 
284  size_t num_entries = sourceDataProvider.getEntryCount();
285  size_t num_rows = sourceDataProvider.getRowCount();
286 
287  if (0 == num_rows) {
288  // bail out early
289  return;
290  }
291 
293 
294  auto fragment_ptr = getFragmentInfo(fragmentId);
295  auto& fragment = *fragment_ptr;
296  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
297  get_chunks(catalog, td, fragment, memoryLevel, chunks);
298  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
299  columnDescriptors.size());
300  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
301  size_t indexOfDeletedColumn{0};
302  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
303  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
304  auto chunk = chunks[indexOfChunk];
305  const auto chunk_cd = chunk->getColumnDesc();
306 
307  if (chunk_cd->isDeletedCol) {
308  indexOfDeletedColumn = chunk_cd->columnId;
309  deletedChunk = chunk;
310  continue;
311  }
312 
313  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
314  columnDescriptors.end(),
315  [=](const ColumnDescriptor* cd) -> bool {
316  return cd->columnId == chunk_cd->columnId;
317  });
318 
319  if (targetColumnIt != columnDescriptors.end()) {
320  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
321 
322  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
323  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
324 
326  num_rows,
327  *catalog,
328  sourceDataMetaInfo,
329  targetDescriptor,
330  targetDescriptor->columnType,
331  !targetDescriptor->columnType.get_notnull(),
332  sourceDataProvider.getLiteralDictionary(),
334  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
335  ? executor->getStringDictionaryProxy(
336  sourceDataMetaInfo.get_type_info().get_comp_param(),
337  executor->getRowSetMemoryOwner(),
338  true)
339  : nullptr,
340  nullptr};
341  auto converter = factory.create(param);
342  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
343 
344  if (targetDescriptor->columnType.is_geometry()) {
345  // geometry columns are composites
346  // need to skip chunks, depending on geo type
347  switch (targetDescriptor->columnType.get_type()) {
348  case kMULTIPOLYGON:
349  indexOfChunk += 5;
350  break;
351  case kPOLYGON:
352  indexOfChunk += 4;
353  break;
354  case kMULTILINESTRING:
355  indexOfChunk += 3;
356  break;
357  case kLINESTRING:
358  case kMULTIPOINT:
359  indexOfChunk += 2;
360  break;
361  case kPOINT:
362  indexOfChunk += 1;
363  break;
364  default:
365  CHECK(false); // not supported
366  }
367  }
368  } else {
369  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
370  std::unique_ptr<ChunkToInsertDataConverter> converter;
371 
372  if (chunk_cd->columnType.is_fixlen_array()) {
373  converter =
374  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
375  } else if (chunk_cd->columnType.is_string()) {
376  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
377  } else if (chunk_cd->columnType.is_geometry()) {
378  // the logical geo column is a string column
379  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
380  } else {
381  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
382  }
383 
384  chunkConverters.push_back(std::move(converter));
385 
386  } else if (chunk_cd->columnType.is_date_in_days()) {
387  /* Q: Why do we need this?
388  A: In variable length updates path we move the chunk content of column
389  without decoding. Since it again passes through DateDaysEncoder
390  the expected value should be in seconds, but here it will be in days.
391  Therefore, using DateChunkConverter chunk values are being scaled to
392  seconds which then ultimately encoded in days in DateDaysEncoder.
393  */
394  std::unique_ptr<ChunkToInsertDataConverter> converter;
395  const size_t physical_size = chunk_cd->columnType.get_size();
396  if (physical_size == 2) {
397  converter =
398  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
399  } else if (physical_size == 4) {
400  converter =
401  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
402  } else {
403  CHECK(false);
404  }
405  chunkConverters.push_back(std::move(converter));
406  } else {
407  std::unique_ptr<ChunkToInsertDataConverter> converter;
408  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
409  int logical_size = logical_type.get_size();
410  int physical_size = chunk_cd->columnType.get_size();
411 
412  if (logical_type.is_string()) {
413  // for dicts -> logical = physical
414  logical_size = physical_size;
415  }
416 
417  if (8 == physical_size) {
418  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
419  num_rows, chunk.get());
420  } else if (4 == physical_size) {
421  if (8 == logical_size) {
422  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
423  num_rows, chunk.get());
424  } else {
425  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
426  num_rows, chunk.get());
427  }
428  } else if (2 == chunk_cd->columnType.get_size()) {
429  if (8 == logical_size) {
430  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
431  num_rows, chunk.get());
432  } else if (4 == logical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
434  num_rows, chunk.get());
435  } else {
436  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
437  num_rows, chunk.get());
438  }
439  } else if (1 == chunk_cd->columnType.get_size()) {
440  if (8 == logical_size) {
441  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
442  num_rows, chunk.get());
443  } else if (4 == logical_size) {
444  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
445  num_rows, chunk.get());
446  } else if (2 == logical_size) {
447  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
448  num_rows, chunk.get());
449  } else {
450  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
451  num_rows, chunk.get());
452  }
453  } else {
454  CHECK(false); // unknown
455  }
456 
457  chunkConverters.push_back(std::move(converter));
458  }
459  }
460  }
461 
462  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
463  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
464 
465  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
466  bool* deletedChunkBuffer =
467  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
468 
469  std::atomic<size_t> row_idx{0};
470 
471  auto row_converter = [&sourceDataProvider,
472  &sourceDataConverters,
473  &indexOffFragmentOffsetColumn,
474  &chunkConverters,
475  &deletedChunkBuffer,
476  &row_idx](size_t indexOfEntry) -> void {
477  // convert the source data
478  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
479  if (row.empty()) {
480  return;
481  }
482 
483  size_t indexOfRow = row_idx.fetch_add(1);
484 
485  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
486  if (sourceDataConverters[col]) {
487  const auto& mapd_variant = row[col];
488  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
489  }
490  }
491 
492  auto scalar = checked_get(
493  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
494  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
495 
496  // convert the remaining chunks
497  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
498  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
499  }
500 
501  // now mark the row as deleted
502  deletedChunkBuffer[indexInChunkBuffer] = true;
503  };
504 
505  bool can_go_parallel = num_rows > 20000;
506 
507  if (can_go_parallel) {
508  const size_t num_worker_threads = cpu_threads();
509  std::vector<std::future<void>> worker_threads;
510  for (size_t i = 0,
511  start_entry = 0,
512  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
513  i < num_worker_threads && start_entry < num_entries;
514  ++i, start_entry += stride) {
515  const auto end_entry = std::min(start_entry + stride, num_rows);
516  worker_threads.push_back(std::async(
518  [&row_converter](const size_t start, const size_t end) {
519  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
520  row_converter(indexOfRow);
521  }
522  },
523  start_entry,
524  end_entry));
525  }
526 
527  for (auto& child : worker_threads) {
528  child.wait();
529  }
530 
531  } else {
532  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
533  row_converter(entryIdx);
534  }
535  }
536 
538  insert_data.databaseId = catalog->getCurrentDB().dbId;
539  insert_data.tableId = td->tableId;
540 
541  for (size_t i = 0; i < chunkConverters.size(); i++) {
542  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
543  continue;
544  }
545 
546  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
547  if (sourceDataConverters[i]) {
548  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
549  }
550  continue;
551  }
552 
553  insert_data.numRows = num_rows;
554  insert_data.is_default.resize(insert_data.columnIds.size(), false);
555  insertDataNoCheckpoint(insert_data);
556 
557  // update metdata for deleted chunk as we are doing special handling
558  auto chunkMetadata =
559  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
560  chunkMetadata->chunkStats.max.boolval = 1;
561 
562  // Im not completely sure that we need to do this in fragmented and on the buffer
563  // but leaving this alone for now
564  if (!deletedChunk->getBuffer()->hasEncoder()) {
565  deletedChunk->initEncoder();
566  }
567  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
568 
569  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
570  // An append to the same fragment will increase shadowNumTuples.
571  // Update NumElems in this case. Otherwise, use existing NumElems.
572  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
573  }
574  deletedChunk->getBuffer()->setUpdated();
575 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
bool is_varlen_update
Definition: UpdelRoll.h:57
HOST DEVICE int get_size() const
Definition: sqltypes.h:389
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1198
std::vector< bool > is_default
Definition: Fragmenter.h:75
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
future< Result > async(Fn &&fn, Args &&...args)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4769
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:54
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
#define CHECK(condition)
Definition: Logger.h:222
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
bool is_string() const
Definition: sqltypes.h:575
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:25
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 977 of file UpdelStorage.cpp.

References fragmentInfoMutex_, UpdelRoll::getChunkMetadataMap(), and UpdelRoll::getNumTuple().

979  {
981  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
982  auto& fragmentInfo = *key.second;
983  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
984  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
985  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
986  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
987 }
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
std::unique_lock< T > unique_lock
size_t getNumTuple(const MetaDataKey &key) const

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1081 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1084  {
1085  const auto cd = chunk->getColumnDesc();
1086  const auto& col_type = cd->columnType;
1087  auto data_buffer = chunk->getBuffer();
1088  auto data_addr = data_buffer->getMemoryPtr();
1089  auto element_size =
1090  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1091  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1092  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1093  size_t nbytes_fix_data_to_keep = 0;
1094  auto nrows_to_vacuum = frag_offsets.size();
1095  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1096  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1097  auto is_last_one = irow == nrows_to_vacuum;
1098  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1099  auto maddr_to_vacuum = data_addr;
1100  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1101  if (nrows_to_keep > 0) {
1102  auto nbytes_to_keep = nrows_to_keep * element_size;
1103  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1104  // move curr fixlen row block toward front
1105  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1106  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1107  nbytes_to_keep);
1108  }
1109  irow_of_blk_to_fill += nrows_to_keep;
1110  nbytes_fix_data_to_keep += nbytes_to_keep;
1111  }
1112  irow_of_blk_to_keep = irow_to_vacuum + 1;
1113  }
1114  return nbytes_fix_data_to_keep;
1115 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1195 of file UpdelStorage.cpp.

References CHECK, Fragmenter_Namespace::get_buffer_offset(), Fragmenter_Namespace::get_null_padding(), Fragmenter_Namespace::get_var_len_null_array_indexes(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1198  {
1199  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1200  auto data_buffer = chunk->getBuffer();
1201  CHECK(data_buffer);
1202  auto index_buffer = chunk->getIndexBuf();
1203  CHECK(index_buffer);
1204  auto data_addr = data_buffer->getMemoryPtr();
1205  auto indices_addr = index_buffer->getMemoryPtr();
1206  CHECK(indices_addr);
1207  auto index_array = (StringOffsetT*)indices_addr;
1208  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1209  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1210  size_t nbytes_fix_data_to_keep = 0;
1211  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1212  size_t null_padding =
1213  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1214  size_t nbytes_var_data_to_keep = null_padding;
1215  auto null_array_indexes = get_var_len_null_array_indexes(
1216  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1217  auto nrows_to_vacuum = frag_offsets.size();
1218  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1219  auto is_last_one = irow == nrows_to_vacuum;
1220  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1221  auto maddr_to_vacuum = data_addr;
1222  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1223  if (nrows_to_keep > 0) {
1224  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1225  auto deleted_row_start_offset =
1226  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1227  auto kept_row_start_offset =
1228  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1229  auto nbytes_to_keep =
1230  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1231  kept_row_start_offset;
1232  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1233  if (nbytes_to_keep > 0) {
1234  CHECK(data_addr);
1235  // move curr varlen row block toward front
1236  memmove(data_addr + ibyte_var_data_to_keep,
1237  data_addr + kept_row_start_offset,
1238  nbytes_to_keep);
1239  }
1240 
1241  const auto base_offset = kept_row_start_offset;
1242  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1243  auto update_index = irow_of_blk_to_keep + i;
1244  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1245  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1246  }
1247  }
1248  nbytes_var_data_to_keep += nbytes_to_keep;
1249  maddr_to_vacuum = indices_addr;
1250 
1251  constexpr static auto index_element_size = sizeof(StringOffsetT);
1252  nbytes_to_keep = nrows_to_keep * index_element_size;
1253  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1254  // move curr fixlen row block toward front
1255  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1256  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1257  nbytes_to_keep);
1258  }
1259  irow_of_blk_to_fill += nrows_to_keep;
1260  nbytes_fix_data_to_keep += nbytes_to_keep;
1261  }
1262  irow_of_blk_to_keep = irow_to_vacuum + 1;
1263  }
1264 
1265  // Set expected null padding, last offset, and negative values for null array offsets.
1266  index_array[0] = null_padding;
1267  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1268  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1269  if (!is_varlen_array) {
1270  CHECK(null_array_indexes.empty());
1271  }
1272  for (auto index : null_array_indexes) {
1273  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1274  }
1275  return nbytes_var_data_to_keep;
1276 }
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
int32_t StringOffsetT
Definition: sqltypes.h:1224
#define CHECK(condition)
Definition: Logger.h:222
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 192 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 194 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 216 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 210 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

heavyai::shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 212 of file InsertOrderFragmenter.h.

Referenced by updateColumnMetadata(), and updateMetadata().

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 197 of file InsertOrderFragmenter.h.

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 218 of file InsertOrderFragmenter.h.

heavyai::shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 214 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 208 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 207 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 209 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 221 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 206 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 219 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 202 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 246 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::vector<std::unique_ptr<Chunk_NS::Chunk> > Fragmenter_Namespace::InsertOrderFragmenter::tracked_in_memory_chunks_
protected

Definition at line 195 of file InsertOrderFragmenter.h.

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 217 of file InsertOrderFragmenter.h.

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 220 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: