OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
size_t getNumFragments () override
 returns the number of fragments in a table More...
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insert_data_struct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertChunks (const InsertChunks &insert_chunk) override
 Insert chunks into minimal number of fragments. More...
 
void insertDataNoCheckpoint (InsertData &insert_data_struct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void insertChunksNoCheckpoint (const InsertChunks &insert_chunk) override
 Insert chunks into minimal number of fragments; no locks or checkpoints taken. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateColumnChunkMetadata (const ColumnDescriptor *cd, const int fragment_id, const std::shared_ptr< ChunkMetadata > metadata) override
 Updates the metadata for a column chunk. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map, std::optional< Data_Namespace::MemoryLevel > memory_level) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
std::optional< ChunkUpdateStatsupdateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
void resetSizesFromFragments () override
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void conditionallyInstantiateFileMgrWithParams ()
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insert_data)
 
void insertChunksImpl (const InsertChunks &insert_chunk)
 
void addColumns (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< std::unique_ptr
< FragmentInfo > > 
fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Private Member Functions

bool isAddingNewColumns (const InsertData &insert_data) const
 
void dropFragmentsToSizeNoInsertLock (const size_t max_rows)
 
void setLastFragmentVarLenColumnSizes ()
 
void insertChunksIntoFragment (const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, const size_t start_fragment)
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 53 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 95 of file InsertOrderFragmenter.cpp.

95 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::addColumns ( const InsertData insertDataStruct)
protected

Definition at line 498 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, i, and Fragmenter_Namespace::InsertData::numRows.

498  {
499  // synchronize concurrent accesses to fragmentInfoVec_
500  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
501  size_t numRowsLeft = insertDataStruct.numRows;
502  for (const auto columnId : insertDataStruct.columnIds) {
503  CHECK(columnMap_.end() == columnMap_.find(columnId));
504  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
505  CHECK(columnDesc);
506  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
507  }
508  try {
509  for (auto const& fragmentInfo : fragmentInfoVec_) {
510  fragmentInfo->shadowChunkMetadataMap =
511  fragmentInfo->getChunkMetadataMapPhysicalCopy();
512  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
513  size_t numRowsCanBeInserted;
514  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
515  auto columnId = insertDataStruct.columnIds[i];
516  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
517  CHECK(colDesc);
518  CHECK(columnMap_.find(columnId) != columnMap_.end());
519 
520  ChunkKey chunkKey = chunkKeyPrefix_;
521  chunkKey.push_back(columnId);
522  chunkKey.push_back(fragmentInfo->fragmentId);
523 
524  auto colMapIt = columnMap_.find(columnId);
525  auto& chunk = colMapIt->second;
526  if (chunk.isChunkOnDevice(
527  dataMgr_,
528  chunkKey,
530  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
531  dataMgr_->deleteChunksWithPrefix(chunkKey);
532  }
533  chunk.createChunkBuffer(
534  dataMgr_,
535  chunkKey,
537  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
538  chunk.initEncoder();
539 
540  try {
541  DataBlockPtr dataCopy = insertDataStruct.data[i];
542  auto size = colDesc->columnType.get_size();
543  if (0 > size) {
544  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
545  varLenColInfo_[columnId] = 0;
546  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
547  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
548  } else {
549  numRowsCanBeInserted = maxChunkSize_ / size;
550  }
551 
552  // FIXME: abort a case in which new column is wider than existing columns
553  if (numRowsCanBeInserted < numRowsToInsert) {
554  throw std::runtime_error("new column '" + colDesc->columnName +
555  "' wider than existing columns is not supported");
556  }
557 
558  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
559  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
560 
561  // update total size of var-len column in (actually the last) fragment
562  if (0 > size) {
563  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
564  varLenColInfo_[columnId] = chunk.getBuffer()->size();
565  }
566  } catch (...) {
567  dataMgr_->deleteChunksWithPrefix(chunkKey);
568  throw;
569  }
570  }
571  numRowsLeft -= numRowsToInsert;
572  }
573  CHECK(0 == numRowsLeft);
574  } catch (const std::exception& e) {
575  for (const auto columnId : insertDataStruct.columnIds) {
576  columnMap_.erase(columnId);
577  }
578  throw e;
579  }
580 
581  for (auto const& fragmentInfo : fragmentInfoVec_) {
582  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
583  }
584 }
std::vector< int > ChunkKey
Definition: types.h:37
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:493
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:211
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1271 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), getChunksForAllColumns(), getFragmentInfo(), Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), UpdelRoll::setNumTuple(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1276  {
1277  auto fragment_ptr = getFragmentInfo(fragment_id);
1278  auto& fragment = *fragment_ptr;
1279  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1280  const auto ncol = chunks.size();
1281 
1282  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1283 
1284  // parallel delete columns
1285  std::vector<std::future<void>> threads;
1286  auto nrows_to_vacuum = frag_offsets.size();
1287  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1288  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1289 
1290  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1291  auto chunk = chunks[ci];
1292  const auto cd = chunk->getColumnDesc();
1293  const auto& col_type = cd->columnType;
1294  auto data_buffer = chunk->getBuffer();
1295  auto index_buffer = chunk->getIndexBuf();
1296  auto data_addr = data_buffer->getMemoryPtr();
1297  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1298  auto index_array = (StringOffsetT*)indices_addr;
1299  bool is_varlen = col_type.is_varlen_indeed();
1300 
1301  auto fixlen_vacuum =
1302  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1303  size_t nbytes_fix_data_to_keep;
1304  if (nrows_to_keep == 0) {
1305  nbytes_fix_data_to_keep = 0;
1306  } else {
1307  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1308  }
1309 
1310  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1311  data_buffer->setSize(nbytes_fix_data_to_keep);
1312  data_buffer->setUpdated();
1313 
1314  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1315 
1316  auto daddr = data_addr;
1317  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1318  : get_element_size(col_type);
1319  data_buffer->getEncoder()->resetChunkStats();
1320  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1321  if (col_type.is_fixlen_array()) {
1322  auto encoder =
1323  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1324  CHECK(encoder);
1325  encoder->updateMetadata((int8_t*)daddr);
1326  } else if (col_type.is_fp()) {
1327  set_chunk_stats(col_type,
1328  daddr,
1329  update_stats_per_thread[ci].new_values_stats.has_null,
1330  update_stats_per_thread[ci].new_values_stats.min_double,
1331  update_stats_per_thread[ci].new_values_stats.max_double);
1332  } else {
1333  set_chunk_stats(col_type,
1334  daddr,
1335  update_stats_per_thread[ci].new_values_stats.has_null,
1336  update_stats_per_thread[ci].new_values_stats.min_int64t,
1337  update_stats_per_thread[ci].new_values_stats.max_int64t);
1338  }
1339  }
1340  };
1341 
1342  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1343  size_t nbytes_var_data_to_keep;
1344  if (nrows_to_keep == 0) {
1345  nbytes_var_data_to_keep = 0;
1346  } else {
1347  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1348  }
1349 
1350  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1351  data_buffer->setSize(nbytes_var_data_to_keep);
1352  data_buffer->setUpdated();
1353 
1354  index_buffer->setSize(sizeof(*index_array) *
1355  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1356  index_buffer->setUpdated();
1357 
1358  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1359  };
1360 
1361  if (is_varlen) {
1362  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1363  } else {
1364  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1365  }
1366  if (threads.size() >= (size_t)cpu_threads()) {
1367  wait_cleanup_threads(threads);
1368  }
1369  }
1370 
1371  wait_cleanup_threads(threads);
1372 
1373  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1374  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1375  auto chunk = chunks[ci];
1376  auto cd = chunk->getColumnDesc();
1377  if (!cd->columnType.is_fixlen_array()) {
1378  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1379  // stored in seconds. Do the metadata conversion here before updating the chunk
1380  // stats.
1381  if (cd->columnType.is_date_in_days()) {
1382  auto& stats = update_stats_per_thread[ci].new_values_stats;
1383  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1384  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1385  }
1387  fragment,
1388  chunk,
1389  update_stats_per_thread[ci].new_values_stats,
1390  cd->columnType,
1391  updel_roll);
1392  }
1393  }
1394 }
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:1090
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
#define CHECK(condition)
Definition: Logger.h:211
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
int cpu_threads()
Definition: thread_count.h:24
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams ( )
protected

Definition at line 147 of file InsertOrderFragmenter.cpp.

References catalog_(), Data_Namespace::DISK_LEVEL, File_Namespace::FileMgrParams::max_rollback_epochs, and TableDescriptor::maxRollbackEpochs.

147  {
148  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
149  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
150  // storage per table
151  if (!uses_foreign_storage_ &&
153  const TableDescriptor* td =
154  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
155  File_Namespace::FileMgrParams fileMgrParams;
156  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
158  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
159  }
160 }
int32_t maxRollbackEpochs
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:616
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 922 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), and Fragmenter_Namespace::FragmentInfo::fragmentId.

923  {
924  // also sets the new fragment as the insertBuffer for each column
925 
926  maxFragmentId_++;
927  auto newFragmentInfo = std::make_unique<FragmentInfo>();
928  newFragmentInfo->fragmentId = maxFragmentId_;
929  newFragmentInfo->shadowNumTuples = 0;
930  newFragmentInfo->setPhysicalNumTuples(0);
931  for (const auto levelSize : dataMgr_->levelSizes_) {
932  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
933  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
934  }
935  newFragmentInfo->physicalTableId = physicalTableId_;
936  newFragmentInfo->shard = shard_;
937 
938  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
939  colMapIt != columnMap_.end();
940  ++colMapIt) {
941  ChunkKey chunkKey = chunkKeyPrefix_;
942  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
943  chunkKey.push_back(maxFragmentId_);
944  colMapIt->second.createChunkBuffer(
945  dataMgr_,
946  chunkKey,
947  memoryLevel,
948  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
949  pageSize_);
950  colMapIt->second.initEncoder();
951  }
952 
953  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
954  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
955  return fragmentInfoVec_.back().get();
956 }
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< int > levelSizes_
Definition: DataMgr.h:227
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 268 of file InsertOrderFragmenter.cpp.

References catalog_(), and lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

268  {
269  // Fix a verified loophole on sharded logical table which is locked using logical
270  // tableId while it's its physical tables that can come here when fragments overflow
271  // during COPY. Locks on a logical table and its physical tables never intersect, which
272  // means potential races. It'll be an overkill to resolve a logical table to physical
273  // tables in DBHandler, ParseNode or other higher layers where the logical table is
274  // locked with Table Read/Write locks; it's easier to lock the logical table of its
275  // physical tables. A downside of this approach may be loss of parallel execution of
276  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
277  // operation, the loss seems not a big deal.
278  auto chunkKeyPrefix = chunkKeyPrefix_;
279  if (shard_ >= 0) {
280  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
281  }
282 
283  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
284  // SELECT and COPY may enter a deadlock
285  const auto delete_lock =
287 
288  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
289 
290  for (const auto fragId : dropFragIds) {
291  for (const auto& col : columnMap_) {
292  int colId = col.first;
293  vector<int> fragPrefix = chunkKeyPrefix_;
294  fragPrefix.push_back(colId);
295  fragPrefix.push_back(fragId);
296  dataMgr_->deleteChunksWithPrefix(fragPrefix);
297  }
298  }
299 }
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4289
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:493
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 586 of file InsertOrderFragmenter.cpp.

586  {
587  // prevent concurrent insert rows and drop column
588  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
589  // synchronize concurrent accesses to fragmentInfoVec_
590  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
591  for (auto const& fragmentInfo : fragmentInfoVec_) {
592  fragmentInfo->shadowChunkMetadataMap =
593  fragmentInfo->getChunkMetadataMapPhysicalCopy();
594  }
595 
596  for (const auto columnId : columnIds) {
597  auto cit = columnMap_.find(columnId);
598  if (columnMap_.end() != cit) {
599  columnMap_.erase(cit);
600  }
601 
602  vector<int> fragPrefix = chunkKeyPrefix_;
603  fragPrefix.push_back(columnId);
604  dataMgr_->deleteChunksWithPrefix(fragPrefix);
605 
606  for (const auto& fragmentInfo : fragmentInfoVec_) {
607  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
608  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
609  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
610  }
611  }
612  }
613  for (const auto& fragmentInfo : fragmentInfoVec_) {
614  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
615  }
616 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:493
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 235 of file InsertOrderFragmenter.cpp.

235  {
236  mapd_unique_lock<mapd_shared_mutex> insert_lock(insertMutex_);
238 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock ( const size_t  max_rows)
private

Definition at line 240 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

240  {
241  // not safe to call from outside insertData
242  // b/c depends on insertLock around numTuples_
243 
244  // don't ever drop the only fragment!
245  if (fragmentInfoVec_.empty() ||
246  numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
247  return;
248  }
249 
250  if (numTuples_ > max_rows) {
251  size_t preNumTuples = numTuples_;
252  vector<int> dropFragIds;
253  size_t targetRows = max_rows * DROP_FRAGMENT_FACTOR;
254  while (numTuples_ > targetRows) {
255  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
256  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
257  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
258  fragmentInfoVec_.pop_front();
259  CHECK_GE(numTuples_, numFragTuples);
260  numTuples_ -= numFragTuples;
261  }
262  deleteFragments(dropFragIds);
263  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
264  << " post: " << numTuples_ << " maxRows: " << max_rows;
265  }
266 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:205
#define CHECK_GE(x, y)
Definition: Logger.h:224
#define CHECK_GT(x, y)
Definition: Logger.h:223
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 119 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 162 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GE, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, gpu_enabled::sort(), and to_string().

162  {
163  if (uses_foreign_storage_ ||
165  // memory-resident tables won't have anything on disk
166  ChunkMetadataVector chunk_metadata;
168 
169  // data comes like this - database_id, table_id, column_id, fragment_id
170  // but lets sort by database_id, table_id, fragment_id, column_id
171 
172  int fragment_subkey_index = 3;
173  std::sort(chunk_metadata.begin(),
174  chunk_metadata.end(),
175  [&](const auto& pair1, const auto& pair2) {
176  return pair1.first[3] < pair2.first[3];
177  });
178 
179  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
180  ++chunk_itr) {
181  int cur_column_id = chunk_itr->first[2];
182  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
183 
184  if (fragmentInfoVec_.empty() ||
185  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
186  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
187  CHECK(new_fragment_info);
188  maxFragmentId_ = cur_fragment_id;
189  new_fragment_info->fragmentId = cur_fragment_id;
190  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
191  numTuples_ += new_fragment_info->getPhysicalNumTuples();
192  for (const auto level_size : dataMgr_->levelSizes_) {
193  new_fragment_info->deviceIds.push_back(
194  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
195  }
196  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
197  new_fragment_info->physicalTableId = physicalTableId_;
198  new_fragment_info->shard = shard_;
199  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
200  } else {
201  if (chunk_itr->second->numElements !=
202  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
203  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
204  std::to_string(physicalTableId_) + ", Column " +
205  std::to_string(cur_column_id) + ". Fragment Tuples: " +
207  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
208  ", Chunk Tuples: " +
209  std::to_string(chunk_itr->second->numElements);
210  }
211  }
212  CHECK(fragmentInfoVec_.back().get());
213  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
214  }
215  }
216 
217  size_t maxFixedColSize = 0;
218 
219  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
220  auto size = colIt->second.getColumnDesc()->columnType.get_size();
221  if (size == -1) { // variable length
222  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
223  size = 8; // b/c we use this for string and array indices - gross to have magic
224  // number here
225  }
226  CHECK_GE(size, 0);
227  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
228  }
229 
230  // this is maximum number of rows assuming everything is fixed length
231  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
233 }
std::vector< int > levelSizes_
Definition: DataMgr.h:227
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:205
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:224
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:467
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:211
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 982 of file UpdelStorage.cpp.

References catalog_, CHECK, Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

985  {
986  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
987  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
988  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
989  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
990  ++ncol;
991  if (!cd->isVirtualCol) {
992  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
993  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
994  ChunkKey chunk_key{
995  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
996  auto chunk = Chunk_NS::Chunk::getChunk(cd,
997  &catalog_->getDataMgr(),
998  chunk_key,
999  memory_level,
1000  0,
1001  chunk_meta_it->second->numBytes,
1002  chunk_meta_it->second->numElements);
1003  chunks.push_back(chunk);
1004  }
1005  }
1006  }
1007  return chunks;
1008 }
std::vector< int > ChunkKey
Definition: types.h:37
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:229
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:228
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:211
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:30

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 118 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

118 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 123 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 405 of file InsertOrderFragmenter.cpp.

References CHECK.

Referenced by compactRows(), updateColumn(), and updateColumns().

405  {
406  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
407  fragmentInfoVec_.end(),
408  [fragment_id](const auto& fragment) -> bool {
409  return fragment->fragmentId == fragment_id;
410  });
411  CHECK(fragment_it != fragmentInfoVec_.end());
412  return fragment_it->get();
413 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK(condition)
Definition: Logger.h:211

+ Here is the caller graph for this function:

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected
TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 963 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

963  {
964  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
965  TableInfo queryInfo;
966  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
967  // right now we don't test predicate, so just return (copy of) all fragments
968  bool fragmentsExist = false;
969  if (fragmentInfoVec_.empty()) {
970  // If we have no fragments add a dummy empty fragment to make the executor
971  // not have separate logic for 0-row tables
972  int maxFragmentId = 0;
973  FragmentInfo emptyFragmentInfo;
974  emptyFragmentInfo.fragmentId = maxFragmentId;
975  emptyFragmentInfo.shadowNumTuples = 0;
976  emptyFragmentInfo.setPhysicalNumTuples(0);
977  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
978  emptyFragmentInfo.physicalTableId = physicalTableId_;
979  emptyFragmentInfo.shard = shard_;
980  queryInfo.fragments.push_back(emptyFragmentInfo);
981  } else {
982  fragmentsExist = true;
983  std::for_each(
984  fragmentInfoVec_.begin(),
985  fragmentInfoVec_.end(),
986  [&queryInfo](const auto& fragment_owned_ptr) {
987  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
988  });
989  }
990  readLock.unlock();
991  queryInfo.setPhysicalNumTuples(0);
992  auto partIt = queryInfo.fragments.begin();
993  if (fragmentsExist) {
994  while (partIt != queryInfo.fragments.end()) {
995  if (partIt->getPhysicalNumTuples() == 0) {
996  // this means that a concurrent insert query inserted tuples into a new fragment
997  // but when the query came in we didn't have this fragment. To make sure we
998  // don't mess up the executor we delete this fragment from the metadatamap
999  // (fixes earlier bug found 2015-05-08)
1000  partIt = queryInfo.fragments.erase(partIt);
1001  } else {
1002  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
1003  partIt->getPhysicalNumTuples());
1004  ++partIt;
1005  }
1006  }
1007  } else {
1008  // We added a dummy fragment and know the table is empty
1009  queryInfo.setPhysicalNumTuples(0);
1010  }
1011  return queryInfo;
1012 }
std::vector< int > levelSizes_
Definition: DataMgr.h:227
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumFragments ( )
overridevirtual

returns the number of fragments in a table

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 958 of file InsertOrderFragmenter.cpp.

958  {
959  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
960  return fragmentInfoVec_.size();
961 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1011 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), i, and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1012  {
1013  const auto data_buffer = chunk->getBuffer();
1014  const auto data_addr = data_buffer->getMemoryPtr();
1015  const size_t nrows_in_chunk = data_buffer->size();
1016  const size_t ncore = cpu_threads();
1017  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1018  std::vector<std::vector<uint64_t>> deleted_offsets;
1019  deleted_offsets.resize(ncore);
1020  std::vector<std::future<void>> threads;
1021  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1022  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1023  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1024  const auto ithread = rbegin / segsz;
1025  CHECK(ithread < deleted_offsets.size());
1026  deleted_offsets[ithread].reserve(segsz);
1027  for (size_t r = rbegin; r < rend; ++r) {
1028  if (data_addr[r]) {
1029  deleted_offsets[ithread].push_back(r);
1030  }
1031  }
1032  }));
1033  }
1034  wait_cleanup_threads(threads);
1035  std::vector<uint64_t> all_deleted_offsets;
1036  for (size_t i = 0; i < ncore; ++i) {
1037  all_deleted_offsets.insert(
1038  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1039  }
1040  return all_deleted_offsets;
1041 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
#define CHECK(condition)
Definition: Logger.h:211
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 618 of file InsertOrderFragmenter.cpp.

References CHECK.

618  {
619  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
620 
621  for (auto const& fragment : fragmentInfoVec_) {
622  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
623  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
624  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
625  if (chunk_stats.max.tinyintval == 1) {
626  return true;
627  }
628  }
629  return false;
630 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:211
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunks ( const InsertChunks insert_chunk)
overridevirtual

Insert chunks into minimal number of fragments.

Parameters
insert_chunk- the chunks to insert

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 430 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertChunks::db_id, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertChunks::table_id.

430  {
431  try {
432  // prevent two threads from trying to insert into the same table simultaneously
433  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
434  insertChunksImpl(insert_chunk);
435  if (defaultInsertLevel_ ==
436  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
438  chunkKeyPrefix_[0],
439  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
440  }
441  } catch (...) {
442  auto db_id = insert_chunk.db_id;
443  auto table_epochs = catalog_->getTableEpochs(db_id, insert_chunk.table_id);
444  // the statement below deletes *this* object!
445  // relying on exception propagation at this stage
446  // until we can sort this out in a cleaner fashion
447  catalog_->setTableEpochs(db_id, table_epochs);
448  throw;
449  }
450 }
void insertChunksImpl(const InsertChunks &insert_chunk)
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:555
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3160
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3132

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksImpl ( const InsertChunks insert_chunk)
protected

Definition at line 692 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertChunks::chunks, and Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::get_num_rows_to_insert().

692  {
693  std::optional<int> delete_column_id{std::nullopt};
694  for (const auto& cit : columnMap_) {
695  if (cit.second.getColumnDesc()->isDeletedCol) {
696  delete_column_id = cit.second.getColumnDesc()->columnId;
697  }
698  }
699 
700  // verify that all chunks to be inserted have same number of rows, otherwise the input
701  // data is malformed
702  std::optional<size_t> num_rows{std::nullopt};
703  for (const auto& [column_id, chunk] : insert_chunks.chunks) {
704  auto buffer = chunk->getBuffer();
705  CHECK(buffer);
706  CHECK(buffer->hasEncoder());
707  CHECK(buffer->getEncoder()->getNumElems());
708  if (!num_rows) {
709  num_rows = buffer->getEncoder()->getNumElems();
710  } else {
711  CHECK(num_rows == buffer->getEncoder()->getNumElems());
712  }
713  }
714 
715  size_t num_rows_left = *num_rows;
716  size_t num_rows_inserted = 0;
717 
718  if (num_rows_left == 0) {
719  return;
720  }
721 
722  FragmentInfo* current_fragment{nullptr};
723 
724  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
725  // feels fragile
726  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
727  current_fragment = createNewFragment(defaultInsertLevel_);
728  } else {
729  current_fragment = fragmentInfoVec_.back().get();
730  }
731  CHECK(current_fragment);
732 
733  size_t start_fragment = fragmentInfoVec_.size() - 1;
734 
735  while (num_rows_left > 0) { // may have to create multiple fragments for bulk insert
736  // loop until done inserting all rows
737  CHECK_LE(current_fragment->shadowNumTuples, maxFragmentRows_);
738  size_t rows_left_in_current_fragment =
739  maxFragmentRows_ - current_fragment->shadowNumTuples;
740  size_t num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
741  num_rows_left,
742  num_rows_inserted,
745  insert_chunks,
746  columnMap_);
747 
748  if (rows_left_in_current_fragment == 0 || num_rows_to_insert == 0) {
749  current_fragment = createNewFragment(defaultInsertLevel_);
750  if (num_rows_inserted == 0) {
751  start_fragment++;
752  }
753  rows_left_in_current_fragment = maxFragmentRows_;
754  for (auto& varLenColInfoIt : varLenColInfo_) {
755  varLenColInfoIt.second = 0; // reset byte counter
756  }
757  num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
758  num_rows_left,
759  num_rows_inserted,
760  varLenColInfo_,
762  insert_chunks,
763  columnMap_);
764  }
765 
766  CHECK_GT(num_rows_to_insert, size_t(0)); // would put us into an endless loop as we'd
767  // never be able to insert anything
768 
769  insertChunksIntoFragment(insert_chunks,
770  delete_column_id,
771  current_fragment,
772  num_rows_to_insert,
773  num_rows_inserted,
774  num_rows_left,
775  start_fragment);
776  }
777  numTuples_ += *num_rows;
779 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
void insertChunksIntoFragment(const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, const size_t start_fragment)
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:223
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:222
size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment, const size_t num_rows_left, const size_t num_rows_inserted, const std::unordered_map< int, size_t > &var_len_col_info, const size_t max_chunk_size, const InsertChunks &insert_chunks, std::map< int, Chunk_NS::Chunk > &column_map)
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:211
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksIntoFragment ( const InsertChunks insert_chunks,
const std::optional< int >  delete_column_id,
FragmentInfo current_fragment,
const size_t  num_rows_to_insert,
size_t &  num_rows_inserted,
size_t &  num_rows_left,
const size_t  start_fragment 
)
private

Definition at line 632 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_LE, Fragmenter_Namespace::InsertChunks::chunks, Fragmenter_Namespace::FragmentInfo::fragmentId, i, DataBlockPtr::numbersPtr, Fragmenter_Namespace::FragmentInfo::shadowChunkMetadataMap, and Fragmenter_Namespace::FragmentInfo::shadowNumTuples.

639  {
640  mapd_unique_lock<mapd_shared_mutex> write_lock(fragmentInfoMutex_);
641  // for each column, append the data in the appropriate insert buffer
642  for (auto& [columnId, chunk] : insert_chunks.chunks) {
643  auto colMapIt = columnMap_.find(columnId);
644  CHECK(colMapIt != columnMap_.end());
645  current_fragment->shadowChunkMetadataMap[columnId] =
646  colMapIt->second.appendEncodedData(*chunk, num_rows_to_insert, num_rows_inserted);
647  auto varLenColInfoIt = varLenColInfo_.find(columnId);
648  if (varLenColInfoIt != varLenColInfo_.end()) {
649  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
650  CHECK_LE(varLenColInfoIt->second, maxChunkSize_);
651  }
652  }
653  if (hasMaterializedRowId_) {
654  size_t start_id = maxFragmentRows_ * current_fragment->fragmentId +
655  current_fragment->shadowNumTuples;
656  std::vector<int64_t> row_id_data(num_rows_to_insert);
657  for (size_t i = 0; i < num_rows_to_insert; ++i) {
658  row_id_data[i] = i + start_id;
659  }
660  DataBlockPtr row_id_block;
661  row_id_block.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.data());
662  auto col_map_it = columnMap_.find(rowIdColId_);
663  CHECK(col_map_it != columnMap_.end());
664  current_fragment->shadowChunkMetadataMap[rowIdColId_] = col_map_it->second.appendData(
665  row_id_block, num_rows_to_insert, num_rows_inserted);
666  }
667 
668  if (delete_column_id) { // has delete column
669  std::vector<int8_t> delete_data(num_rows_to_insert, false);
670  DataBlockPtr delete_block;
671  delete_block.numbersPtr = reinterpret_cast<int8_t*>(delete_data.data());
672  auto col_map_it = columnMap_.find(*delete_column_id);
673  CHECK(col_map_it != columnMap_.end());
674  current_fragment->shadowChunkMetadataMap[*delete_column_id] =
675  col_map_it->second.appendData(
676  delete_block, num_rows_to_insert, num_rows_inserted);
677  }
678 
679  current_fragment->shadowNumTuples =
680  fragmentInfoVec_.back()->getPhysicalNumTuples() + num_rows_to_insert;
681  num_rows_left -= num_rows_to_insert;
682  num_rows_inserted += num_rows_to_insert;
683  for (auto partIt = fragmentInfoVec_.begin() + start_fragment;
684  partIt != fragmentInfoVec_.end();
685  ++partIt) {
686  auto fragment_ptr = partIt->get();
687  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
688  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
689  }
690 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_LE(x, y)
Definition: Logger.h:222
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:211
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:226
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksNoCheckpoint ( const InsertChunks insert_chunk)
overridevirtual

Insert chunks into minimal number of fragments; no locks or checkpoints taken.

Parameters
chunk- the chunks to insert

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 478 of file InsertOrderFragmenter.cpp.

478  {
479  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
480  mapd_unique_lock<mapd_shared_mutex> insertLock(
481  insertMutex_); // prevent two threads from trying to insert into the same table
482  // simultaneously
483  insertChunksImpl(insert_chunk);
484 }
void insertChunksImpl(const InsertChunks &insert_chunk)
void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insert_data_struct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 452 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

452  {
453  try {
454  // prevent two threads from trying to insert into the same table simultaneously
455  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
456  if (!isAddingNewColumns(insert_data_struct)) {
457  insertDataImpl(insert_data_struct);
458  } else {
459  addColumns(insert_data_struct);
460  }
461  if (defaultInsertLevel_ ==
462  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
464  chunkKeyPrefix_[0],
465  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
466  }
467  } catch (...) {
468  auto table_epochs = catalog_->getTableEpochs(insert_data_struct.databaseId,
469  insert_data_struct.tableId);
470  // the statement below deletes *this* object!
471  // relying on exception propagation at this stage
472  // until we can sort this out in a cleaner fashion
473  catalog_->setTableEpochs(insert_data_struct.databaseId, table_epochs);
474  throw;
475  }
476 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:555
void addColumns(const InsertData &insertDataStruct)
bool isAddingNewColumns(const InsertData &insert_data) const
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3160
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3132

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insert_data)
protected

Definition at line 781 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, i, Fragmenter_Namespace::InsertData::is_default, DataBlockPtr::numbersPtr, and Fragmenter_Namespace::InsertData::numRows.

781  {
782  // populate deleted system column if it should exist, as it will not come from client
783  std::unique_ptr<int8_t[]> data_for_deleted_column;
784  for (const auto& cit : columnMap_) {
785  if (cit.second.getColumnDesc()->isDeletedCol) {
786  data_for_deleted_column.reset(new int8_t[insert_data.numRows]);
787  memset(data_for_deleted_column.get(), 0, insert_data.numRows);
788  insert_data.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
789  insert_data.columnIds.push_back(cit.second.getColumnDesc()->columnId);
790  insert_data.is_default.push_back(false);
791  break;
792  }
793  }
794  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
795  std::unordered_map<int, int> inverseInsertDataColIdMap;
796  for (size_t insertId = 0; insertId < insert_data.columnIds.size(); ++insertId) {
797  inverseInsertDataColIdMap.insert(
798  std::make_pair(insert_data.columnIds[insertId], insertId));
799  }
800 
801  size_t numRowsLeft = insert_data.numRows;
802  size_t numRowsInserted = 0;
803  vector<DataBlockPtr> dataCopy =
804  insert_data.data; // bc append data will move ptr forward and this violates
805  // constness of InsertData
806  if (numRowsLeft <= 0) {
807  return;
808  }
809 
810  FragmentInfo* currentFragment{nullptr};
811 
812  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
813  // feels fragile
814  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
815  currentFragment = createNewFragment(defaultInsertLevel_);
816  } else {
817  currentFragment = fragmentInfoVec_.back().get();
818  }
819  CHECK(currentFragment);
820 
821  size_t startFragment = fragmentInfoVec_.size() - 1;
822 
823  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
824  // loop until done inserting all rows
825  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
826  size_t rowsLeftInCurrentFragment =
827  maxFragmentRows_ - currentFragment->shadowNumTuples;
828  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
829  if (rowsLeftInCurrentFragment != 0) {
830  for (auto& varLenColInfoIt : varLenColInfo_) {
831  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
832  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
833  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
834  if (insertIdIt != inverseInsertDataColIdMap.end()) {
835  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
836  numRowsToInsert = std::min(numRowsToInsert,
837  colMapIt->second.getNumElemsForBytesInsertData(
838  dataCopy[insertIdIt->second],
839  numRowsToInsert,
840  numRowsInserted,
841  bytesLeft,
842  insert_data.is_default[insertIdIt->second]));
843  }
844  }
845  }
846 
847  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
848  currentFragment = createNewFragment(defaultInsertLevel_);
849  if (numRowsInserted == 0) {
850  startFragment++;
851  }
852  rowsLeftInCurrentFragment = maxFragmentRows_;
853  for (auto& varLenColInfoIt : varLenColInfo_) {
854  varLenColInfoIt.second = 0; // reset byte counter
855  }
856  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
857  for (auto& varLenColInfoIt : varLenColInfo_) {
858  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
859  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
860  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
861  if (insertIdIt != inverseInsertDataColIdMap.end()) {
862  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
863  numRowsToInsert = std::min(numRowsToInsert,
864  colMapIt->second.getNumElemsForBytesInsertData(
865  dataCopy[insertIdIt->second],
866  numRowsToInsert,
867  numRowsInserted,
868  bytesLeft,
869  insert_data.is_default[insertIdIt->second]));
870  }
871  }
872  }
873 
874  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
875  // never be able to insert anything
876 
877  {
878  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
879  // for each column, append the data in the appropriate insert buffer
880  for (size_t i = 0; i < insert_data.columnIds.size(); ++i) {
881  int columnId = insert_data.columnIds[i];
882  auto colMapIt = columnMap_.find(columnId);
883  CHECK(colMapIt != columnMap_.end());
884  currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
885  dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.is_default[i]);
886  auto varLenColInfoIt = varLenColInfo_.find(columnId);
887  if (varLenColInfoIt != varLenColInfo_.end()) {
888  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
889  }
890  }
891  if (hasMaterializedRowId_) {
892  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
893  currentFragment->shadowNumTuples;
894  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
895  for (size_t i = 0; i < numRowsToInsert; ++i) {
896  row_id_data[i] = i + startId;
897  }
898  DataBlockPtr rowIdBlock;
899  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
900  auto colMapIt = columnMap_.find(rowIdColId_);
901  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
902  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
903  }
904 
905  currentFragment->shadowNumTuples =
906  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
907  numRowsLeft -= numRowsToInsert;
908  numRowsInserted += numRowsToInsert;
909  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
910  partIt != fragmentInfoVec_.end();
911  ++partIt) {
912  auto fragment_ptr = partIt->get();
913  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
914  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
915  }
916  }
917  }
918  numTuples_ += insert_data.numRows;
920 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:223
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:222
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:211
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:226
void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insert_data_struct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 486 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

486  {
487  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
488  mapd_unique_lock<mapd_shared_mutex> insertLock(
489  insertMutex_); // prevent two threads from trying to insert into the same table
490  // simultaneously
491  if (!isAddingNewColumns(insert_data_struct)) {
492  insertDataImpl(insert_data_struct);
493  } else {
494  addColumns(insert_data_struct);
495  }
496 }
void addColumns(const InsertData &insertDataStruct)
bool isAddingNewColumns(const InsertData &insert_data) const

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::isAddingNewColumns ( const InsertData insert_data) const
private

Definition at line 415 of file InsertOrderFragmenter.cpp.

References CHECK, and Fragmenter_Namespace::InsertData::columnIds.

415  {
416  bool all_columns_already_exist = true, all_columns_are_new = true;
417  for (const auto column_id : insert_data.columnIds) {
418  if (columnMap_.find(column_id) == columnMap_.end()) {
419  all_columns_already_exist = false;
420  } else {
421  all_columns_are_new = false;
422  }
423  }
424  // only one should be TRUE
425  bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
426  CHECK(either_all_exist_or_all_new);
427  return all_columns_are_new;
428 }
#define CHECK(condition)
Definition: Logger.h:211
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::resetSizesFromFragments ( )
overridevirtual

Resets the fragmenter's size related metadata using the internal fragment info vector. This is typically done after operations, such as vacuuming, which can change fragment sizes.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1014 of file InsertOrderFragmenter.cpp.

1014  {
1015  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
1016  numTuples_ = 0;
1017  for (const auto& fragment_info : fragmentInfoVec_) {
1018  numTuples_ += fragment_info->getPhysicalNumTuples();
1019  }
1021 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
mapd_shared_lock< mapd_shared_mutex > read_lock
void Fragmenter_Namespace::InsertOrderFragmenter::setLastFragmentVarLenColumnSizes ( )
private

Definition at line 1023 of file InsertOrderFragmenter.cpp.

1023  {
1024  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
1025  // Now need to get the insert buffers for each column - should be last
1026  // fragment
1027  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
1028  // TODO: add accessor here for safe indexing
1029  int deviceId =
1030  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
1031  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
1032  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
1033  insertKey.push_back(colIt->first); // column id
1034  insertKey.push_back(lastFragmentId); // fragment id
1035  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
1036  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
1037  if (varLenColInfoIt != varLenColInfo_.end()) {
1038  varLenColInfoIt->second = colIt->second.getBuffer()->size();
1039  }
1040  }
1041  }
1042 }
std::vector< int > ChunkKey
Definition: types.h:37
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map,
std::optional< Data_Namespace::MemoryLevel memory_level 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 314 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, show_chunk(), VLOG, and logger::WARNING.

317  {
318  // synchronize concurrent accesses to fragmentInfoVec_
319  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
325  if (shard_ >= 0) {
326  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
327  }
328 
329  CHECK(cd);
330  const auto column_id = cd->columnId;
331  const auto col_itr = columnMap_.find(column_id);
332  CHECK(col_itr != columnMap_.end());
333 
334  for (auto const& fragment : fragmentInfoVec_) {
335  auto stats_itr = stats_map.find(fragment->fragmentId);
336  if (stats_itr != stats_map.end()) {
337  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
338  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
339  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
341  column_id,
342  fragment->fragmentId};
343  auto chunk = Chunk_NS::Chunk::getChunk(cd,
344  &catalog_->getDataMgr(),
345  chunk_key,
346  memory_level.value_or(defaultInsertLevel_),
347  0,
348  chunk_meta_it->second->numBytes,
349  chunk_meta_it->second->numElements);
350  auto buf = chunk->getBuffer();
351  CHECK(buf);
352  if (!buf->hasEncoder()) {
353  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
354  }
355  auto encoder = buf->getEncoder();
356 
357  auto chunk_stats = stats_itr->second;
358 
359  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
360  encoder->getMetadata(old_chunk_metadata);
361  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
362 
363  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
364  // Use the logical type to display data, since the encoding should be ignored
365  const auto logical_ti = cd->columnType.is_dict_encoded_string()
367  : get_logical_type_info(cd->columnType);
368  if (!didResetStats) {
369  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
370  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
371  << DatumToString(chunk_stats.max, logical_ti);
372  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
373  << DatumToString(chunk_stats.min, logical_ti);
374  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
375  continue; // move to next fragment
376  }
377 
378  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
379  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
380  << DatumToString(chunk_stats.max, logical_ti);
381  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
382  << DatumToString(chunk_stats.min, logical_ti);
383  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
384 
385  // Reset fragment metadata map and set buffer to dirty
386  auto new_metadata = std::make_shared<ChunkMetadata>();
387  // Run through fillChunkStats to ensure any transformations to the raw metadata
388  // values get applied (e.g. for date in days)
389  encoder->getMetadata(new_metadata);
390 
391  fragment->setChunkMetadata(column_id, new_metadata);
392  fragment->shadowChunkMetadataMap =
393  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
395  buf->setDirty();
396  }
397  } else {
398  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
399  << ", table " << physicalTableId_ << ", "
400  << ", column " << column_id;
401  }
402  }
403 }
std::vector< int > ChunkKey
Definition: types.h:37
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:392
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:229
#define LOG(tag)
Definition: Logger.h:205
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1064
std::string show_chunk(const ChunkKey &key)
Definition: types.h:94
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:228
#define CHECK(condition)
Definition: Logger.h:211
bool is_dict_encoded_string() const
Definition: sqltypes.h:557
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:30
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

std::optional< ChunkUpdateStats > Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 633 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, CHECK_GT, Fragmenter_Namespace::ChunkUpdateStats::chunk, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), Fragmenter_Namespace::ChunkUpdateStats::fragment_rows_count, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::ChunkUpdateStats::new_values_stats, Fragmenter_Namespace::ChunkUpdateStats::old_values_stats, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, TableDescriptor::tableId, temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, Fragmenter_Namespace::anonymous_namespace{UpdelStorage.cpp}::update_metadata(), foreign_storage::update_stats(), updateColumnMetadata(), Fragmenter_Namespace::ChunkUpdateStats::updated_rows_count, NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

642  {
643  updel_roll.catalog = catalog;
644  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
645  updel_roll.memoryLevel = memory_level;
646 
647  const size_t ncore = cpu_threads();
648  const auto nrow = frag_offsets.size();
649  const auto n_rhs_values = rhs_values.size();
650  if (0 == nrow) {
651  return {};
652  }
653  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
654 
655  auto fragment_ptr = getFragmentInfo(fragment_id);
656  auto& fragment = *fragment_ptr;
657  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
658  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
659  ChunkKey chunk_key{
660  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
661  auto chunk = Chunk_NS::Chunk::getChunk(cd,
662  &catalog->getDataMgr(),
663  chunk_key,
665  0,
666  chunk_meta_it->second->numBytes,
667  chunk_meta_it->second->numElements);
668 
669  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
670 
671  // parallel update elements
672  std::vector<std::future<void>> threads;
673 
674  const auto segsz = (nrow + ncore - 1) / ncore;
675  auto dbuf = chunk->getBuffer();
676  auto dbuf_addr = dbuf->getMemoryPtr();
677  dbuf->setUpdated();
678  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
679  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
680  threads.emplace_back(std::async(
681  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
682  SQLTypeInfo lhs_type = cd->columnType;
683 
684  // !! not sure if this is a undocumented convention or a bug, but for a sharded
685  // table the dictionary id of a encoded string column is not specified by
686  // comp_param in physical table but somehow in logical table :) comp_param in
687  // physical table is always 0, so need to adapt accordingly...
688  auto cdl = (shard_ < 0)
689  ? cd
690  : catalog->getMetadataForColumn(
691  catalog->getLogicalTableId(td->tableId), cd->columnId);
692  CHECK(cdl);
693  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
694  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
695  lhs_type, &decimalOverflowValidator);
696  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
697  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
698  lhs_type, &dateDaysOverflowValidator);
699 
700  StringDictionary* stringDict{nullptr};
701  if (lhs_type.is_string()) {
702  CHECK(kENCODING_DICT == lhs_type.get_compression());
703  auto dictDesc = const_cast<DictDescriptor*>(
704  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
705  CHECK(dictDesc);
706  stringDict = dictDesc->stringDict.get();
707  CHECK(stringDict);
708  }
709 
710  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
711  const auto roffs = frag_offsets[r];
712  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
713  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
714  ScalarTargetValue sv2;
715 
716  // Subtle here is on the two cases of string-to-string assignments, when
717  // upstream passes RHS string as a string index instead of a preferred "real
718  // string".
719  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
720  // index
721  // in this layer, so if upstream passes a str idx here, an
722  // exception is thrown.
723  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
724  // str idx.
725  if (rhs_type.is_string()) {
726  if (const auto vp = boost::get<int64_t>(sv)) {
727  auto dictDesc = const_cast<DictDescriptor*>(
728  catalog->getMetadataForDict(rhs_type.get_comp_param()));
729  if (nullptr == dictDesc) {
730  throw std::runtime_error(
731  "UPDATE does not support cast from string literal to string "
732  "column.");
733  }
734  auto stringDict = dictDesc->stringDict.get();
735  CHECK(stringDict);
736  sv2 = NullableString(stringDict->getString(*vp));
737  sv = &sv2;
738  }
739  }
740 
741  if (const auto vp = boost::get<int64_t>(sv)) {
742  auto v = *vp;
743  if (lhs_type.is_string()) {
744  throw std::runtime_error("UPDATE does not support cast to string.");
745  }
746  int64_t old_val;
747  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
748  // Handle special case where date column with date in days encoding stores
749  // metadata in epoch seconds.
750  if (lhs_type.is_date_in_days()) {
752  }
753  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
754  if (lhs_type.is_decimal()) {
755  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
756  int64_t decimal_val;
757  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
758  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
759  lhs_type.get_notnull() == false)
760  ? v
761  : decimal_val;
763  lhs_type, update_stats_per_thread[c], target_value, old_val);
764  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
765  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
766  if (positive_v_and_negative_d || negative_v_and_positive_d) {
767  throw std::runtime_error(
768  "Data conversion overflow on " + std::to_string(v) +
769  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
770  std::to_string(rhs_type.get_scale()) + ") to (" +
771  std::to_string(lhs_type.get_dimension()) + ", " +
772  std::to_string(lhs_type.get_scale()) + ")");
773  }
774  } else if (is_integral(lhs_type)) {
775  if (lhs_type.is_date_in_days()) {
776  // Store meta values in seconds
777  if (lhs_type.get_size() == 2) {
778  nullAwareDateOverflowValidator.validate<int16_t>(v);
779  } else {
780  nullAwareDateOverflowValidator.validate<int32_t>(v);
781  }
782  int64_t days;
783  get_scalar<int64_t>(data_ptr, lhs_type, days);
784  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
785  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
786  lhs_type.get_notnull() == false)
787  ? NullSentinelSupplier()(lhs_type, v)
788  : seconds;
790  lhs_type, update_stats_per_thread[c], target_value, old_val);
791  } else {
792  int64_t target_value;
793  if (rhs_type.is_decimal()) {
794  target_value = round(decimal_to_double(rhs_type, v));
795  } else {
796  target_value = v;
797  }
799  lhs_type, update_stats_per_thread[c], target_value, old_val);
800  }
801  } else {
802  if (rhs_type.is_decimal()) {
803  update_metadata(lhs_type,
804  update_stats_per_thread[c],
805  decimal_to_double(rhs_type, v),
806  double(old_val));
807  } else {
808  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
809  }
810  }
811  } else if (const auto vp = boost::get<double>(sv)) {
812  auto v = *vp;
813  if (lhs_type.is_string()) {
814  throw std::runtime_error("UPDATE does not support cast to string.");
815  }
816  double old_val;
817  get_scalar<double>(data_ptr, lhs_type, old_val);
818  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
819  if (lhs_type.is_integer()) {
821  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
822  } else if (lhs_type.is_fp()) {
824  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
825  } else {
826  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
827  "LHS with a floating RHS.";
828  }
829  } else if (const auto vp = boost::get<float>(sv)) {
830  auto v = *vp;
831  if (lhs_type.is_string()) {
832  throw std::runtime_error("UPDATE does not support cast to string.");
833  }
834  float old_val;
835  get_scalar<float>(data_ptr, lhs_type, old_val);
836  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
837  if (lhs_type.is_integer()) {
839  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
840  } else {
841  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
842  }
843  } else if (const auto vp = boost::get<NullableString>(sv)) {
844  const auto s = boost::get<std::string>(vp);
845  const auto sval = s ? *s : std::string("");
846  if (lhs_type.is_string()) {
847  decltype(stringDict->getOrAdd(sval)) sidx;
848  {
849  std::unique_lock<std::mutex> lock(temp_mutex_);
850  sidx = stringDict->getOrAdd(sval);
851  }
852  int64_t old_val;
853  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
854  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
856  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
857  } else if (sval.size() > 0) {
858  auto dval = std::atof(sval.data());
859  if (lhs_type.is_boolean()) {
860  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
861  } else if (lhs_type.is_time()) {
862  throw std::runtime_error(
863  "Date/Time/Timestamp update not supported through translated "
864  "string path.");
865  }
866  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
867  double old_val;
868  get_scalar<double>(data_ptr, lhs_type, old_val);
869  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
871  lhs_type, update_stats_per_thread[c], double(dval), old_val);
872  } else {
873  int64_t old_val;
874  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
875  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
877  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
878  }
879  } else {
880  put_null(data_ptr, lhs_type, cd->columnName);
881  update_stats_per_thread[c].new_values_stats.has_null = true;
882  }
883  } else {
884  CHECK(false);
885  }
886  }
887  }));
888  if (threads.size() >= (size_t)cpu_threads()) {
889  wait_cleanup_threads(threads);
890  }
891  }
892  wait_cleanup_threads(threads);
893 
894  // for unit test
896  if (cd->isDeletedCol) {
897  const auto deleted_offsets = getVacuumOffsets(chunk);
898  if (deleted_offsets.size() > 0) {
899  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
900  return {};
901  }
902  }
903  }
904  ChunkUpdateStats update_stats;
905  for (size_t c = 0; c < ncore; ++c) {
906  update_metadata(update_stats.new_values_stats,
907  update_stats_per_thread[c].new_values_stats);
908  update_metadata(update_stats.old_values_stats,
909  update_stats_per_thread[c].old_values_stats);
910  }
911 
912  CHECK_GT(fragment.shadowNumTuples, size_t(0));
914  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
915  update_stats.updated_rows_count = nrow;
916  update_stats.fragment_rows_count = fragment.shadowNumTuples;
917  update_stats.chunk = chunk;
918  return update_stats;
919 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:54
std::vector< int > ChunkKey
Definition: types.h:37
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:229
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
#define UNREACHABLE()
Definition: Logger.h:255
#define CHECK_GT(x, y)
Definition: Logger.h:223
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:52
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:228
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4289
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1579
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:53
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:211
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:30
bool is_string() const
Definition: sqltypes.h:519
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:522
std::string columnName
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 54 of file UpdelStorage.cpp.

References updateColumn().

62  {
63  updateColumn(catalog,
64  td,
65  cd,
66  fragment_id,
67  frag_offsets,
68  std::vector<ScalarTargetValue>(1, rhs_value),
69  rhs_type,
70  memory_level,
71  updel_roll);
72 }
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnChunkMetadata ( const ColumnDescriptor cd,
const int  fragment_id,
const std::shared_ptr< ChunkMetadata metadata 
)
overridevirtual

Updates the metadata for a column chunk.

Parameters
cd- ColumnDescriptor for the column
fragment_id- Fragment id of the chunk within the column
metadata- shared_ptr of the metadata to update column chunk with

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 301 of file InsertOrderFragmenter.cpp.

References CHECK, and ColumnDescriptor::columnId.

304  {
305  // synchronize concurrent accesses to fragmentInfoVec_
306  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
307 
308  CHECK(metadata.get());
309  auto fragment_info = getFragmentInfo(fragment_id);
310  CHECK(fragment_info);
311  fragment_info->setChunkMetadata(cd->columnId, metadata);
312 }
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
#define CHECK(condition)
Definition: Logger.h:211
void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const UpdateValuesStats update_values_stats,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 921 of file UpdelStorage.cpp.

References UpdelRoll::catalog, ColumnDescriptor::columnId, ColumnDescriptor::columnType, fragmentInfoMutex_, UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getMetadataForTable(), Fragmenter_Namespace::UpdateValuesStats::has_null, SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, Fragmenter_Namespace::UpdateValuesStats::max_double, Fragmenter_Namespace::UpdateValuesStats::max_int64t, Fragmenter_Namespace::UpdateValuesStats::min_double, Fragmenter_Namespace::UpdateValuesStats::min_int64t, ColumnDescriptor::tableId, and foreign_storage::update_stats().

Referenced by compactRows(), and updateColumn().

927  {
928  mapd_unique_lock<mapd_shared_mutex> write_lock(fragmentInfoMutex_);
929  auto buffer = chunk->getBuffer();
930  const auto& lhs_type = cd->columnType;
931 
932  auto encoder = buffer->getEncoder();
933  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
934  static_assert(std::is_same<decltype(min), decltype(max)>::value,
935  "Type mismatch on min/max");
936  if (has_null) {
937  encoder->updateStats(decltype(min)(), true);
938  }
939  if (max < min) {
940  return;
941  }
942  encoder->updateStats(min, false);
943  encoder->updateStats(max, false);
944  };
945 
946  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
947  update_stats(new_values_stats.min_int64t,
948  new_values_stats.max_int64t,
949  new_values_stats.has_null);
950  } else if (lhs_type.is_fp()) {
951  update_stats(new_values_stats.min_double,
952  new_values_stats.max_double,
953  new_values_stats.has_null);
954  } else if (lhs_type.is_decimal()) {
955  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
956  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
957  new_values_stats.has_null);
958  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
959  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
960  update_stats(new_values_stats.min_int64t,
961  new_values_stats.max_int64t,
962  new_values_stats.has_null);
963  }
964  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
965  auto chunk_metadata =
966  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
967  buffer->getEncoder()->getMetadata(chunk_metadata);
968 }
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:52
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
mapd_unique_lock< mapd_shared_mutex > write_lock
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:522
bool is_integral(const SQLTypeInfo &t)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll,
Executor executor 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 267 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, checked_get(), Fragmenter_Namespace::InsertData::columnIds, cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, g_enable_experimental_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), i, insertDataNoCheckpoint(), Fragmenter_Namespace::InsertData::is_default, SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

277  {
278  updelRoll.is_varlen_update = true;
279  updelRoll.catalog = catalog;
280  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
281  updelRoll.memoryLevel = memoryLevel;
282 
283  size_t num_entries = sourceDataProvider.getEntryCount();
284  size_t num_rows = sourceDataProvider.getRowCount();
285 
286  if (0 == num_rows) {
287  // bail out early
288  return;
289  }
290 
292 
293  auto fragment_ptr = getFragmentInfo(fragmentId);
294  auto& fragment = *fragment_ptr;
295  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
296  get_chunks(catalog, td, fragment, memoryLevel, chunks);
297  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
298  columnDescriptors.size());
299  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
300  size_t indexOfDeletedColumn{0};
301  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
302  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
303  auto chunk = chunks[indexOfChunk];
304  const auto chunk_cd = chunk->getColumnDesc();
305 
306  if (chunk_cd->isDeletedCol) {
307  indexOfDeletedColumn = chunk_cd->columnId;
308  deletedChunk = chunk;
309  continue;
310  }
311 
312  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
313  columnDescriptors.end(),
314  [=](const ColumnDescriptor* cd) -> bool {
315  return cd->columnId == chunk_cd->columnId;
316  });
317 
318  if (targetColumnIt != columnDescriptors.end()) {
319  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
320 
321  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
322  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
323 
325  num_rows,
326  *catalog,
327  sourceDataMetaInfo,
328  targetDescriptor,
329  targetDescriptor->columnType,
330  !targetDescriptor->columnType.get_notnull(),
331  sourceDataProvider.getLiteralDictionary(),
333  ? executor->getStringDictionaryProxy(
334  sourceDataMetaInfo.get_type_info().get_comp_param(),
335  executor->getRowSetMemoryOwner(),
336  true)
337  : nullptr};
338  auto converter = factory.create(param);
339  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
340 
341  if (targetDescriptor->columnType.is_geometry()) {
342  // geometry columns are composites
343  // need to skip chunks, depending on geo type
344  switch (targetDescriptor->columnType.get_type()) {
345  case kMULTIPOLYGON:
346  indexOfChunk += 5;
347  break;
348  case kPOLYGON:
349  indexOfChunk += 4;
350  break;
351  case kLINESTRING:
352  indexOfChunk += 2;
353  break;
354  case kPOINT:
355  indexOfChunk += 1;
356  break;
357  default:
358  CHECK(false); // not supported
359  }
360  }
361  } else {
362  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
363  std::unique_ptr<ChunkToInsertDataConverter> converter;
364 
365  if (chunk_cd->columnType.is_fixlen_array()) {
366  converter =
367  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
368  } else if (chunk_cd->columnType.is_string()) {
369  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
370  } else if (chunk_cd->columnType.is_geometry()) {
371  // the logical geo column is a string column
372  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
373  } else {
374  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
375  }
376 
377  chunkConverters.push_back(std::move(converter));
378 
379  } else if (chunk_cd->columnType.is_date_in_days()) {
380  /* Q: Why do we need this?
381  A: In variable length updates path we move the chunk content of column
382  without decoding. Since it again passes through DateDaysEncoder
383  the expected value should be in seconds, but here it will be in days.
384  Therefore, using DateChunkConverter chunk values are being scaled to
385  seconds which then ultimately encoded in days in DateDaysEncoder.
386  */
387  std::unique_ptr<ChunkToInsertDataConverter> converter;
388  const size_t physical_size = chunk_cd->columnType.get_size();
389  if (physical_size == 2) {
390  converter =
391  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
392  } else if (physical_size == 4) {
393  converter =
394  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
395  } else {
396  CHECK(false);
397  }
398  chunkConverters.push_back(std::move(converter));
399  } else {
400  std::unique_ptr<ChunkToInsertDataConverter> converter;
401  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
402  int logical_size = logical_type.get_size();
403  int physical_size = chunk_cd->columnType.get_size();
404 
405  if (logical_type.is_string()) {
406  // for dicts -> logical = physical
407  logical_size = physical_size;
408  }
409 
410  if (8 == physical_size) {
411  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
412  num_rows, chunk.get());
413  } else if (4 == physical_size) {
414  if (8 == logical_size) {
415  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
416  num_rows, chunk.get());
417  } else {
418  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
419  num_rows, chunk.get());
420  }
421  } else if (2 == chunk_cd->columnType.get_size()) {
422  if (8 == logical_size) {
423  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
424  num_rows, chunk.get());
425  } else if (4 == logical_size) {
426  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
427  num_rows, chunk.get());
428  } else {
429  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
430  num_rows, chunk.get());
431  }
432  } else if (1 == chunk_cd->columnType.get_size()) {
433  if (8 == logical_size) {
434  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
435  num_rows, chunk.get());
436  } else if (4 == logical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
438  num_rows, chunk.get());
439  } else if (2 == logical_size) {
440  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
441  num_rows, chunk.get());
442  } else {
443  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
444  num_rows, chunk.get());
445  }
446  } else {
447  CHECK(false); // unknown
448  }
449 
450  chunkConverters.push_back(std::move(converter));
451  }
452  }
453  }
454 
455  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
456  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
457 
458  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
459  bool* deletedChunkBuffer =
460  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
461 
462  std::atomic<size_t> row_idx{0};
463 
464  auto row_converter = [&sourceDataProvider,
465  &sourceDataConverters,
466  &indexOffFragmentOffsetColumn,
467  &chunkConverters,
468  &deletedChunkBuffer,
469  &row_idx](size_t indexOfEntry) -> void {
470  // convert the source data
471  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
472  if (row.empty()) {
473  return;
474  }
475 
476  size_t indexOfRow = row_idx.fetch_add(1);
477 
478  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
479  if (sourceDataConverters[col]) {
480  const auto& mapd_variant = row[col];
481  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
482  }
483  }
484 
485  auto scalar = checked_get(
486  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
487  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
488 
489  // convert the remaining chunks
490  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
491  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
492  }
493 
494  // now mark the row as deleted
495  deletedChunkBuffer[indexInChunkBuffer] = true;
496  };
497 
498  bool can_go_parallel = num_rows > 20000;
499 
500  if (can_go_parallel) {
501  const size_t num_worker_threads = cpu_threads();
502  std::vector<std::future<void>> worker_threads;
503  for (size_t i = 0,
504  start_entry = 0,
505  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
506  i < num_worker_threads && start_entry < num_entries;
507  ++i, start_entry += stride) {
508  const auto end_entry = std::min(start_entry + stride, num_rows);
509  worker_threads.push_back(std::async(
511  [&row_converter](const size_t start, const size_t end) {
512  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
513  row_converter(indexOfRow);
514  }
515  },
516  start_entry,
517  end_entry));
518  }
519 
520  for (auto& child : worker_threads) {
521  child.wait();
522  }
523 
524  } else {
525  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
526  row_converter(entryIdx);
527  }
528  }
529 
531  insert_data.databaseId = catalog->getCurrentDB().dbId;
532  insert_data.tableId = td->tableId;
533 
534  for (size_t i = 0; i < chunkConverters.size(); i++) {
535  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
536  continue;
537  }
538 
539  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
540  if (sourceDataConverters[i]) {
541  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
542  }
543  continue;
544  }
545 
546  insert_data.numRows = num_rows;
547  insert_data.is_default.resize(insert_data.columnIds.size(), false);
548  insertDataNoCheckpoint(insert_data);
549 
550  // update metdata for deleted chunk as we are doing special handling
551  auto chunkMetadata =
552  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
553  chunkMetadata->chunkStats.max.boolval = 1;
554 
555  // Im not completely sure that we need to do this in fragmented and on the buffer
556  // but leaving this alone for now
557  if (!deletedChunk->getBuffer()->hasEncoder()) {
558  deletedChunk->initEncoder();
559  }
560  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
561 
562  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
563  // An append to the same fragment will increase shadowNumTuples.
564  // Update NumElems in this case. Otherwise, use existing NumElems.
565  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
566  }
567  deletedChunk->getBuffer()->setUpdated();
568 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:54
bool is_varlen_update
Definition: UpdelRoll.h:56
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1064
std::vector< bool > is_default
Definition: Fragmenter.h:73
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:68
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:70
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
future< Result > async(Fn &&fn, Args &&...args)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:52
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4289
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
bool g_enable_experimental_string_functions
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:53
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
#define CHECK(condition)
Definition: Logger.h:211
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:66
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
bool is_string() const
Definition: sqltypes.h:519
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:24
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:69
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 970 of file UpdelStorage.cpp.

References fragmentInfoMutex_, UpdelRoll::getChunkMetadataMap(), and UpdelRoll::getNumTuple().

972  {
973  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
974  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
975  auto& fragmentInfo = *key.second;
976  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
977  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
978  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
979  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
980 }
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
size_t getNumTuple(const MetaDataKey &key) const

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1074 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1077  {
1078  const auto cd = chunk->getColumnDesc();
1079  const auto& col_type = cd->columnType;
1080  auto data_buffer = chunk->getBuffer();
1081  auto data_addr = data_buffer->getMemoryPtr();
1082  auto element_size =
1083  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1084  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1085  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1086  size_t nbytes_fix_data_to_keep = 0;
1087  auto nrows_to_vacuum = frag_offsets.size();
1088  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1089  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1090  auto is_last_one = irow == nrows_to_vacuum;
1091  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1092  auto maddr_to_vacuum = data_addr;
1093  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1094  if (nrows_to_keep > 0) {
1095  auto nbytes_to_keep = nrows_to_keep * element_size;
1096  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1097  // move curr fixlen row block toward front
1098  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1099  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1100  nbytes_to_keep);
1101  }
1102  irow_of_blk_to_fill += nrows_to_keep;
1103  nbytes_fix_data_to_keep += nbytes_to_keep;
1104  }
1105  irow_of_blk_to_keep = irow_to_vacuum + 1;
1106  }
1107  return nbytes_fix_data_to_keep;
1108 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1188 of file UpdelStorage.cpp.

References CHECK, Fragmenter_Namespace::get_buffer_offset(), Fragmenter_Namespace::get_null_padding(), Fragmenter_Namespace::get_var_len_null_array_indexes(), Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples(), and i.

Referenced by compactRows().

1191  {
1192  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1193  auto data_buffer = chunk->getBuffer();
1194  CHECK(data_buffer);
1195  auto index_buffer = chunk->getIndexBuf();
1196  CHECK(index_buffer);
1197  auto data_addr = data_buffer->getMemoryPtr();
1198  auto indices_addr = index_buffer->getMemoryPtr();
1199  CHECK(indices_addr);
1200  auto index_array = (StringOffsetT*)indices_addr;
1201  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1202  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1203  size_t nbytes_fix_data_to_keep = 0;
1204  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1205  size_t null_padding =
1206  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1207  size_t nbytes_var_data_to_keep = null_padding;
1208  auto null_array_indexes = get_var_len_null_array_indexes(
1209  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1210  auto nrows_to_vacuum = frag_offsets.size();
1211  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1212  auto is_last_one = irow == nrows_to_vacuum;
1213  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1214  auto maddr_to_vacuum = data_addr;
1215  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1216  if (nrows_to_keep > 0) {
1217  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1218  auto deleted_row_start_offset =
1219  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1220  auto kept_row_start_offset =
1221  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1222  auto nbytes_to_keep =
1223  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1224  kept_row_start_offset;
1225  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1226  if (nbytes_to_keep > 0) {
1227  CHECK(data_addr);
1228  // move curr varlen row block toward front
1229  memmove(data_addr + ibyte_var_data_to_keep,
1230  data_addr + kept_row_start_offset,
1231  nbytes_to_keep);
1232  }
1233 
1234  const auto base_offset = kept_row_start_offset;
1235  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1236  auto update_index = irow_of_blk_to_keep + i;
1237  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1238  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1239  }
1240  }
1241  nbytes_var_data_to_keep += nbytes_to_keep;
1242  maddr_to_vacuum = indices_addr;
1243 
1244  constexpr static auto index_element_size = sizeof(StringOffsetT);
1245  nbytes_to_keep = nrows_to_keep * index_element_size;
1246  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1247  // move curr fixlen row block toward front
1248  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1249  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1250  nbytes_to_keep);
1251  }
1252  irow_of_blk_to_fill += nrows_to_keep;
1253  nbytes_fix_data_to_keep += nbytes_to_keep;
1254  }
1255  irow_of_blk_to_keep = irow_to_vacuum + 1;
1256  }
1257 
1258  // Set expected null padding, last offset, and negative values for null array offsets.
1259  index_array[0] = null_padding;
1260  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1261  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1262  if (!is_varlen_array) {
1263  CHECK(null_array_indexes.empty());
1264  }
1265  for (auto index : null_array_indexes) {
1266  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1267  }
1268  return nbytes_var_data_to_keep;
1269 }
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
int32_t StringOffsetT
Definition: sqltypes.h:1090
#define CHECK(condition)
Definition: Logger.h:211
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 191 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 193 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 214 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 208 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 210 of file InsertOrderFragmenter.h.

Referenced by updateColumnMetadata(), and updateMetadata().

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 195 of file InsertOrderFragmenter.h.

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 216 of file InsertOrderFragmenter.h.

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 212 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 206 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 205 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 207 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 219 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 202 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 217 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 200 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 244 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 215 of file InsertOrderFragmenter.h.

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 218 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: