OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
size_t getNumFragments () override
 returns the number of fragments in a table More...
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insert_data_struct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertDataNoCheckpoint (InsertData &insert_data_struct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateColumnChunkMetadata (const ColumnDescriptor *cd, const int fragment_id, const std::shared_ptr< ChunkMetadata > metadata) override
 Updates the metadata for a column chunk. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map, std::optional< Data_Namespace::MemoryLevel > memory_level) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
std::optional< ChunkUpdateStatsupdateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
void resetSizesFromFragments () override
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void conditionallyInstantiateFileMgrWithParams ()
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insert_data)
 
void addColumns (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< std::unique_ptr
< FragmentInfo > > 
fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Private Member Functions

bool isAddingNewColumns (const InsertData &insert_data) const
 
void dropFragmentsToSizeNoInsertLock (const size_t max_rows)
 
void setLastFragmentVarLenColumnSizes ()
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 53 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 95 of file InsertOrderFragmenter.cpp.

95 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::addColumns ( const InsertData insertDataStruct)
protected

Definition at line 436 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, i, and Fragmenter_Namespace::InsertData::numRows.

436  {
437  // synchronize concurrent accesses to fragmentInfoVec_
438  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
439  size_t numRowsLeft = insertDataStruct.numRows;
440  for (const auto columnId : insertDataStruct.columnIds) {
441  CHECK(columnMap_.end() == columnMap_.find(columnId));
442  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
443  CHECK(columnDesc);
444  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
445  }
446  try {
447  for (auto const& fragmentInfo : fragmentInfoVec_) {
448  fragmentInfo->shadowChunkMetadataMap =
449  fragmentInfo->getChunkMetadataMapPhysicalCopy();
450  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
451  size_t numRowsCanBeInserted;
452  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
453  auto columnId = insertDataStruct.columnIds[i];
454  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
455  CHECK(colDesc);
456  CHECK(columnMap_.find(columnId) != columnMap_.end());
457 
458  ChunkKey chunkKey = chunkKeyPrefix_;
459  chunkKey.push_back(columnId);
460  chunkKey.push_back(fragmentInfo->fragmentId);
461 
462  auto colMapIt = columnMap_.find(columnId);
463  auto& chunk = colMapIt->second;
464  if (chunk.isChunkOnDevice(
465  dataMgr_,
466  chunkKey,
468  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
469  dataMgr_->deleteChunksWithPrefix(chunkKey);
470  }
471  chunk.createChunkBuffer(
472  dataMgr_,
473  chunkKey,
475  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
476  chunk.initEncoder();
477 
478  try {
479  DataBlockPtr dataCopy = insertDataStruct.data[i];
480  auto size = colDesc->columnType.get_size();
481  if (0 > size) {
482  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
483  varLenColInfo_[columnId] = 0;
484  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
485  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
486  } else {
487  numRowsCanBeInserted = maxChunkSize_ / size;
488  }
489 
490  // FIXME: abort a case in which new column is wider than existing columns
491  if (numRowsCanBeInserted < numRowsToInsert) {
492  throw std::runtime_error("new column '" + colDesc->columnName +
493  "' wider than existing columns is not supported");
494  }
495 
496  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
497  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
498 
499  // update total size of var-len column in (actually the last) fragment
500  if (0 > size) {
501  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
502  varLenColInfo_[columnId] = chunk.getBuffer()->size();
503  }
504  } catch (...) {
505  dataMgr_->deleteChunksWithPrefix(chunkKey);
506  throw;
507  }
508  }
509  numRowsLeft -= numRowsToInsert;
510  }
511  CHECK(0 == numRowsLeft);
512  } catch (const std::exception& e) {
513  for (const auto columnId : insertDataStruct.columnIds) {
514  columnMap_.erase(columnId);
515  }
516  throw e;
517  }
518 
519  for (auto const& fragmentInfo : fragmentInfoVec_) {
520  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
521  }
522 }
std::vector< int > ChunkKey
Definition: types.h:37
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:442
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:209
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1271 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), getChunksForAllColumns(), getFragmentInfo(), Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), UpdelRoll::setNumTuple(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1276  {
1277  auto fragment_ptr = getFragmentInfo(fragment_id);
1278  auto& fragment = *fragment_ptr;
1279  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1280  const auto ncol = chunks.size();
1281 
1282  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1283 
1284  // parallel delete columns
1285  std::vector<std::future<void>> threads;
1286  auto nrows_to_vacuum = frag_offsets.size();
1287  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1288  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1289 
1290  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1291  auto chunk = chunks[ci];
1292  const auto cd = chunk->getColumnDesc();
1293  const auto& col_type = cd->columnType;
1294  auto data_buffer = chunk->getBuffer();
1295  auto index_buffer = chunk->getIndexBuf();
1296  auto data_addr = data_buffer->getMemoryPtr();
1297  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1298  auto index_array = (StringOffsetT*)indices_addr;
1299  bool is_varlen = col_type.is_varlen_indeed();
1300 
1301  auto fixlen_vacuum =
1302  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1303  size_t nbytes_fix_data_to_keep;
1304  if (nrows_to_keep == 0) {
1305  nbytes_fix_data_to_keep = 0;
1306  } else {
1307  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1308  }
1309 
1310  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1311  data_buffer->setSize(nbytes_fix_data_to_keep);
1312  data_buffer->setUpdated();
1313 
1314  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1315 
1316  auto daddr = data_addr;
1317  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1318  : get_element_size(col_type);
1319  data_buffer->getEncoder()->resetChunkStats();
1320  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1321  if (col_type.is_fixlen_array()) {
1322  auto encoder =
1323  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1324  CHECK(encoder);
1325  encoder->updateMetadata((int8_t*)daddr);
1326  } else if (col_type.is_fp()) {
1327  set_chunk_stats(col_type,
1328  daddr,
1329  update_stats_per_thread[ci].new_values_stats.has_null,
1330  update_stats_per_thread[ci].new_values_stats.min_double,
1331  update_stats_per_thread[ci].new_values_stats.max_double);
1332  } else {
1333  set_chunk_stats(col_type,
1334  daddr,
1335  update_stats_per_thread[ci].new_values_stats.has_null,
1336  update_stats_per_thread[ci].new_values_stats.min_int64t,
1337  update_stats_per_thread[ci].new_values_stats.max_int64t);
1338  }
1339  }
1340  };
1341 
1342  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1343  size_t nbytes_var_data_to_keep;
1344  if (nrows_to_keep == 0) {
1345  nbytes_var_data_to_keep = 0;
1346  } else {
1347  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1348  }
1349 
1350  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1351  data_buffer->setSize(nbytes_var_data_to_keep);
1352  data_buffer->setUpdated();
1353 
1354  index_buffer->setSize(sizeof(*index_array) *
1355  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1356  index_buffer->setUpdated();
1357 
1358  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1359  };
1360 
1361  if (is_varlen) {
1362  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1363  } else {
1364  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1365  }
1366  if (threads.size() >= (size_t)cpu_threads()) {
1367  wait_cleanup_threads(threads);
1368  }
1369  }
1370 
1371  wait_cleanup_threads(threads);
1372 
1373  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1374  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1375  auto chunk = chunks[ci];
1376  auto cd = chunk->getColumnDesc();
1377  if (!cd->columnType.is_fixlen_array()) {
1378  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1379  // stored in seconds. Do the metadata conversion here before updating the chunk
1380  // stats.
1381  if (cd->columnType.is_date_in_days()) {
1382  auto& stats = update_stats_per_thread[ci].new_values_stats;
1383  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1384  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1385  }
1387  fragment,
1388  chunk,
1389  update_stats_per_thread[ci].new_values_stats,
1390  cd->columnType,
1391  updel_roll);
1392  }
1393  }
1394 }
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:957
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
#define CHECK(condition)
Definition: Logger.h:209
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
int cpu_threads()
Definition: thread_count.h:24
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams ( )
protected

Definition at line 115 of file InsertOrderFragmenter.cpp.

References catalog_(), Data_Namespace::DISK_LEVEL, File_Namespace::FileMgrParams::max_rollback_epochs, and TableDescriptor::maxRollbackEpochs.

115  {
116  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
117  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
118  // storage per table
119  if (!uses_foreign_storage_ &&
121  const TableDescriptor* td =
122  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
123  File_Namespace::FileMgrParams fileMgrParams;
124  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
126  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
127  }
128 }
int32_t maxRollbackEpochs
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:549
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 711 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), and Fragmenter_Namespace::FragmentInfo::fragmentId.

712  {
713  // also sets the new fragment as the insertBuffer for each column
714 
715  maxFragmentId_++;
716  auto newFragmentInfo = std::make_unique<FragmentInfo>();
717  newFragmentInfo->fragmentId = maxFragmentId_;
718  newFragmentInfo->shadowNumTuples = 0;
719  newFragmentInfo->setPhysicalNumTuples(0);
720  for (const auto levelSize : dataMgr_->levelSizes_) {
721  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
722  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
723  }
724  newFragmentInfo->physicalTableId = physicalTableId_;
725  newFragmentInfo->shard = shard_;
726 
727  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
728  colMapIt != columnMap_.end();
729  ++colMapIt) {
730  ChunkKey chunkKey = chunkKeyPrefix_;
731  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
732  chunkKey.push_back(maxFragmentId_);
733  colMapIt->second.createChunkBuffer(
734  dataMgr_,
735  chunkKey,
736  memoryLevel,
737  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
738  pageSize_);
739  colMapIt->second.initEncoder();
740  }
741 
742  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
743  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
744  return fragmentInfoVec_.back().get();
745 }
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< int > levelSizes_
Definition: DataMgr.h:213
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 235 of file InsertOrderFragmenter.cpp.

References catalog_(), and lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

235  {
236  // Fix a verified loophole on sharded logical table which is locked using logical
237  // tableId while it's its physical tables that can come here when fragments overflow
238  // during COPY. Locks on a logical table and its physical tables never intersect, which
239  // means potential races. It'll be an overkill to resolve a logical table to physical
240  // tables in DBHandler, ParseNode or other higher layers where the logical table is
241  // locked with Table Read/Write locks; it's easier to lock the logical table of its
242  // physical tables. A downside of this approach may be loss of parallel execution of
243  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
244  // operation, the loss seems not a big deal.
245  auto chunkKeyPrefix = chunkKeyPrefix_;
246  if (shard_ >= 0) {
247  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
248  }
249 
250  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
251  // SELECT and COPY may enter a deadlock
252  const auto delete_lock =
254 
255  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
256 
257  for (const auto fragId : dropFragIds) {
258  for (const auto& col : columnMap_) {
259  int colId = col.first;
260  vector<int> fragPrefix = chunkKeyPrefix_;
261  fragPrefix.push_back(colId);
262  fragPrefix.push_back(fragId);
263  dataMgr_->deleteChunksWithPrefix(fragPrefix);
264  }
265  }
266 }
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4246
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:442
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 524 of file InsertOrderFragmenter.cpp.

524  {
525  // prevent concurrent insert rows and drop column
526  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
527  // synchronize concurrent accesses to fragmentInfoVec_
528  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
529  for (auto const& fragmentInfo : fragmentInfoVec_) {
530  fragmentInfo->shadowChunkMetadataMap =
531  fragmentInfo->getChunkMetadataMapPhysicalCopy();
532  }
533 
534  for (const auto columnId : columnIds) {
535  auto cit = columnMap_.find(columnId);
536  if (columnMap_.end() != cit) {
537  columnMap_.erase(cit);
538  }
539 
540  vector<int> fragPrefix = chunkKeyPrefix_;
541  fragPrefix.push_back(columnId);
542  dataMgr_->deleteChunksWithPrefix(fragPrefix);
543 
544  for (const auto& fragmentInfo : fragmentInfoVec_) {
545  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
546  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
547  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
548  }
549  }
550  }
551  for (const auto& fragmentInfo : fragmentInfoVec_) {
552  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
553  }
554 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:442
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 203 of file InsertOrderFragmenter.cpp.

203  {
204  mapd_unique_lock<mapd_shared_mutex> insert_lock(insertMutex_);
206 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock ( const size_t  max_rows)
private

Definition at line 208 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

208  {
209  // not safe to call from outside insertData
210  // b/c depends on insertLock around numTuples_
211 
212  // don't ever drop the only fragment!
213  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
214  return;
215  }
216 
217  if (numTuples_ > max_rows) {
218  size_t preNumTuples = numTuples_;
219  vector<int> dropFragIds;
220  size_t targetRows = max_rows * DROP_FRAGMENT_FACTOR;
221  while (numTuples_ > targetRows) {
222  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
223  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
224  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
225  fragmentInfoVec_.pop_front();
226  CHECK_GE(numTuples_, numFragTuples);
227  numTuples_ -= numFragTuples;
228  }
229  deleteFragments(dropFragIds);
230  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
231  << " post: " << numTuples_ << " maxRows: " << max_rows;
232  }
233 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:203
#define CHECK_GE(x, y)
Definition: Logger.h:222
#define CHECK_GT(x, y)
Definition: Logger.h:221
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 115 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 130 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GE, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, gpu_enabled::sort(), and to_string().

130  {
131  if (uses_foreign_storage_ ||
133  // memory-resident tables won't have anything on disk
134  ChunkMetadataVector chunk_metadata;
136 
137  // data comes like this - database_id, table_id, column_id, fragment_id
138  // but lets sort by database_id, table_id, fragment_id, column_id
139 
140  int fragment_subkey_index = 3;
141  std::sort(chunk_metadata.begin(),
142  chunk_metadata.end(),
143  [&](const auto& pair1, const auto& pair2) {
144  return pair1.first[3] < pair2.first[3];
145  });
146 
147  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
148  ++chunk_itr) {
149  int cur_column_id = chunk_itr->first[2];
150  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
151 
152  if (fragmentInfoVec_.empty() ||
153  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
154  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
155  CHECK(new_fragment_info);
156  maxFragmentId_ = cur_fragment_id;
157  new_fragment_info->fragmentId = cur_fragment_id;
158  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
159  numTuples_ += new_fragment_info->getPhysicalNumTuples();
160  for (const auto level_size : dataMgr_->levelSizes_) {
161  new_fragment_info->deviceIds.push_back(
162  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
163  }
164  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
165  new_fragment_info->physicalTableId = physicalTableId_;
166  new_fragment_info->shard = shard_;
167  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
168  } else {
169  if (chunk_itr->second->numElements !=
170  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
171  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
172  std::to_string(physicalTableId_) + ", Column " +
173  std::to_string(cur_column_id) + ". Fragment Tuples: " +
175  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
176  ", Chunk Tuples: " +
177  std::to_string(chunk_itr->second->numElements);
178  }
179  }
180  CHECK(fragmentInfoVec_.back().get());
181  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
182  }
183  }
184 
185  size_t maxFixedColSize = 0;
186 
187  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
188  auto size = colIt->second.getColumnDesc()->columnType.get_size();
189  if (size == -1) { // variable length
190  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
191  size = 8; // b/c we use this for string and array indices - gross to have magic
192  // number here
193  }
194  CHECK_GE(size, 0);
195  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
196  }
197 
198  // this is maximum number of rows assuming everything is fixed length
199  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
201 }
std::vector< int > levelSizes_
Definition: DataMgr.h:213
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:203
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:222
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:416
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:209
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 982 of file UpdelStorage.cpp.

References catalog_, CHECK, Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

985  {
986  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
987  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
988  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
989  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
990  ++ncol;
991  if (!cd->isVirtualCol) {
992  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
993  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
994  ChunkKey chunk_key{
995  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
996  auto chunk = Chunk_NS::Chunk::getChunk(cd,
997  &catalog_->getDataMgr(),
998  chunk_key,
999  memory_level,
1000  0,
1001  chunk_meta_it->second->numBytes,
1002  chunk_meta_it->second->numElements);
1003  chunks.push_back(chunk);
1004  }
1005  }
1006  }
1007  return chunks;
1008 }
std::vector< int > ChunkKey
Definition: types.h:37
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 114 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

114 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 119 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 372 of file InsertOrderFragmenter.cpp.

References CHECK.

Referenced by compactRows(), updateColumn(), and updateColumns().

372  {
373  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
374  fragmentInfoVec_.end(),
375  [fragment_id](const auto& fragment) -> bool {
376  return fragment->fragmentId == fragment_id;
377  });
378  CHECK(fragment_it != fragmentInfoVec_.end());
379  return fragment_it->get();
380 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the caller graph for this function:

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected
TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 752 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

752  {
753  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
754  TableInfo queryInfo;
755  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
756  // right now we don't test predicate, so just return (copy of) all fragments
757  bool fragmentsExist = false;
758  if (fragmentInfoVec_.empty()) {
759  // If we have no fragments add a dummy empty fragment to make the executor
760  // not have separate logic for 0-row tables
761  int maxFragmentId = 0;
762  FragmentInfo emptyFragmentInfo;
763  emptyFragmentInfo.fragmentId = maxFragmentId;
764  emptyFragmentInfo.shadowNumTuples = 0;
765  emptyFragmentInfo.setPhysicalNumTuples(0);
766  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
767  emptyFragmentInfo.physicalTableId = physicalTableId_;
768  emptyFragmentInfo.shard = shard_;
769  queryInfo.fragments.push_back(emptyFragmentInfo);
770  } else {
771  fragmentsExist = true;
772  std::for_each(
773  fragmentInfoVec_.begin(),
774  fragmentInfoVec_.end(),
775  [&queryInfo](const auto& fragment_owned_ptr) {
776  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
777  });
778  }
779  readLock.unlock();
780  queryInfo.setPhysicalNumTuples(0);
781  auto partIt = queryInfo.fragments.begin();
782  if (fragmentsExist) {
783  while (partIt != queryInfo.fragments.end()) {
784  if (partIt->getPhysicalNumTuples() == 0) {
785  // this means that a concurrent insert query inserted tuples into a new fragment
786  // but when the query came in we didn't have this fragment. To make sure we
787  // don't mess up the executor we delete this fragment from the metadatamap
788  // (fixes earlier bug found 2015-05-08)
789  partIt = queryInfo.fragments.erase(partIt);
790  } else {
791  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
792  partIt->getPhysicalNumTuples());
793  ++partIt;
794  }
795  }
796  } else {
797  // We added a dummy fragment and know the table is empty
798  queryInfo.setPhysicalNumTuples(0);
799  }
800  return queryInfo;
801 }
std::vector< int > levelSizes_
Definition: DataMgr.h:213
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumFragments ( )
overridevirtual

returns the number of fragments in a table

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 747 of file InsertOrderFragmenter.cpp.

747  {
748  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
749  return fragmentInfoVec_.size();
750 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1011 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), i, and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1012  {
1013  const auto data_buffer = chunk->getBuffer();
1014  const auto data_addr = data_buffer->getMemoryPtr();
1015  const size_t nrows_in_chunk = data_buffer->size();
1016  const size_t ncore = cpu_threads();
1017  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1018  std::vector<std::vector<uint64_t>> deleted_offsets;
1019  deleted_offsets.resize(ncore);
1020  std::vector<std::future<void>> threads;
1021  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1022  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1023  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1024  const auto ithread = rbegin / segsz;
1025  CHECK(ithread < deleted_offsets.size());
1026  deleted_offsets[ithread].reserve(segsz);
1027  for (size_t r = rbegin; r < rend; ++r) {
1028  if (data_addr[r]) {
1029  deleted_offsets[ithread].push_back(r);
1030  }
1031  }
1032  }));
1033  }
1034  wait_cleanup_threads(threads);
1035  std::vector<uint64_t> all_deleted_offsets;
1036  for (size_t i = 0; i < ncore; ++i) {
1037  all_deleted_offsets.insert(
1038  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1039  }
1040  return all_deleted_offsets;
1041 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
#define CHECK(condition)
Definition: Logger.h:209
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 556 of file InsertOrderFragmenter.cpp.

References CHECK.

556  {
557  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
558 
559  for (auto const& fragment : fragmentInfoVec_) {
560  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
561  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
562  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
563  if (chunk_stats.max.tinyintval == 1) {
564  return true;
565  }
566  }
567  return false;
568 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:209
void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insert_data_struct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 397 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

397  {
398  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
399  try {
400  // prevent two threads from trying to insert into the same table simultaneously
401  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
402  if (!isAddingNewColumns(insert_data_struct)) {
403  insertDataImpl(insert_data_struct);
404  } else {
405  addColumns(insert_data_struct);
406  }
407  if (defaultInsertLevel_ ==
408  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
410  chunkKeyPrefix_[0],
411  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
412  }
413  } catch (...) {
414  auto table_epochs = catalog_->getTableEpochs(insert_data_struct.databaseId,
415  insert_data_struct.tableId);
416  // the statement below deletes *this* object!
417  // relying on exception propagation at this stage
418  // until we can sort this out in a cleaner fashion
419  catalog_->setTableEpochs(insert_data_struct.databaseId, table_epochs);
420  throw;
421  }
422 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:495
void addColumns(const InsertData &insertDataStruct)
bool isAddingNewColumns(const InsertData &insert_data) const
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3127
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3099

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insert_data)
protected

Definition at line 570 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, i, Fragmenter_Namespace::InsertData::is_default, DataBlockPtr::numbersPtr, and Fragmenter_Namespace::InsertData::numRows.

570  {
571  // populate deleted system column if it should exist, as it will not come from client
572  std::unique_ptr<int8_t[]> data_for_deleted_column;
573  for (const auto& cit : columnMap_) {
574  if (cit.second.getColumnDesc()->isDeletedCol) {
575  data_for_deleted_column.reset(new int8_t[insert_data.numRows]);
576  memset(data_for_deleted_column.get(), 0, insert_data.numRows);
577  insert_data.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
578  insert_data.columnIds.push_back(cit.second.getColumnDesc()->columnId);
579  insert_data.is_default.push_back(false);
580  break;
581  }
582  }
583  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
584  std::unordered_map<int, int> inverseInsertDataColIdMap;
585  for (size_t insertId = 0; insertId < insert_data.columnIds.size(); ++insertId) {
586  inverseInsertDataColIdMap.insert(
587  std::make_pair(insert_data.columnIds[insertId], insertId));
588  }
589 
590  size_t numRowsLeft = insert_data.numRows;
591  size_t numRowsInserted = 0;
592  vector<DataBlockPtr> dataCopy =
593  insert_data.data; // bc append data will move ptr forward and this violates
594  // constness of InsertData
595  if (numRowsLeft <= 0) {
596  return;
597  }
598 
599  FragmentInfo* currentFragment{nullptr};
600 
601  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
602  // feels fragile
603  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
604  currentFragment = createNewFragment(defaultInsertLevel_);
605  } else {
606  currentFragment = fragmentInfoVec_.back().get();
607  }
608  CHECK(currentFragment);
609 
610  size_t startFragment = fragmentInfoVec_.size() - 1;
611 
612  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
613  // loop until done inserting all rows
614  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
615  size_t rowsLeftInCurrentFragment =
616  maxFragmentRows_ - currentFragment->shadowNumTuples;
617  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
618  if (rowsLeftInCurrentFragment != 0) {
619  for (auto& varLenColInfoIt : varLenColInfo_) {
620  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
621  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
622  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
623  if (insertIdIt != inverseInsertDataColIdMap.end()) {
624  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
625  numRowsToInsert = std::min(numRowsToInsert,
626  colMapIt->second.getNumElemsForBytesInsertData(
627  dataCopy[insertIdIt->second],
628  numRowsToInsert,
629  numRowsInserted,
630  bytesLeft,
631  insert_data.is_default[insertIdIt->second]));
632  }
633  }
634  }
635 
636  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
637  currentFragment = createNewFragment(defaultInsertLevel_);
638  if (numRowsInserted == 0) {
639  startFragment++;
640  }
641  rowsLeftInCurrentFragment = maxFragmentRows_;
642  for (auto& varLenColInfoIt : varLenColInfo_) {
643  varLenColInfoIt.second = 0; // reset byte counter
644  }
645  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
646  for (auto& varLenColInfoIt : varLenColInfo_) {
647  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
648  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
649  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
650  if (insertIdIt != inverseInsertDataColIdMap.end()) {
651  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
652  numRowsToInsert = std::min(numRowsToInsert,
653  colMapIt->second.getNumElemsForBytesInsertData(
654  dataCopy[insertIdIt->second],
655  numRowsToInsert,
656  numRowsInserted,
657  bytesLeft,
658  insert_data.is_default[insertIdIt->second]));
659  }
660  }
661  }
662 
663  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
664  // never be able to insert anything
665 
666  {
667  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
668  // for each column, append the data in the appropriate insert buffer
669  for (size_t i = 0; i < insert_data.columnIds.size(); ++i) {
670  int columnId = insert_data.columnIds[i];
671  auto colMapIt = columnMap_.find(columnId);
672  CHECK(colMapIt != columnMap_.end());
673  currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
674  dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.is_default[i]);
675  auto varLenColInfoIt = varLenColInfo_.find(columnId);
676  if (varLenColInfoIt != varLenColInfo_.end()) {
677  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
678  }
679  }
680  if (hasMaterializedRowId_) {
681  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
682  currentFragment->shadowNumTuples;
683  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
684  for (size_t i = 0; i < numRowsToInsert; ++i) {
685  row_id_data[i] = i + startId;
686  }
687  DataBlockPtr rowIdBlock;
688  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
689  auto colMapIt = columnMap_.find(rowIdColId_);
690  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
691  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
692  }
693 
694  currentFragment->shadowNumTuples =
695  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
696  numRowsLeft -= numRowsToInsert;
697  numRowsInserted += numRowsToInsert;
698  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
699  partIt != fragmentInfoVec_.end();
700  ++partIt) {
701  auto fragment_ptr = partIt->get();
702  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
703  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
704  }
705  }
706  }
707  numTuples_ += insert_data.numRows;
709 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:221
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:220
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:209
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:226
void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insert_data_struct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 424 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

424  {
425  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
426  mapd_unique_lock<mapd_shared_mutex> insertLock(
427  insertMutex_); // prevent two threads from trying to insert into the same table
428  // simultaneously
429  if (!isAddingNewColumns(insert_data_struct)) {
430  insertDataImpl(insert_data_struct);
431  } else {
432  addColumns(insert_data_struct);
433  }
434 }
void addColumns(const InsertData &insertDataStruct)
bool isAddingNewColumns(const InsertData &insert_data) const

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::isAddingNewColumns ( const InsertData insert_data) const
private

Definition at line 382 of file InsertOrderFragmenter.cpp.

References CHECK, and Fragmenter_Namespace::InsertData::columnIds.

382  {
383  bool all_columns_already_exist = true, all_columns_are_new = true;
384  for (const auto column_id : insert_data.columnIds) {
385  if (columnMap_.find(column_id) == columnMap_.end()) {
386  all_columns_already_exist = false;
387  } else {
388  all_columns_are_new = false;
389  }
390  }
391  // only one should be TRUE
392  bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
393  CHECK(either_all_exist_or_all_new);
394  return all_columns_are_new;
395 }
#define CHECK(condition)
Definition: Logger.h:209
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::resetSizesFromFragments ( )
overridevirtual

Resets the fragmenter's size related metadata using the internal fragment info vector. This is typically done after operations, such as vacuuming, which can change fragment sizes.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 803 of file InsertOrderFragmenter.cpp.

803  {
804  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
805  numTuples_ = 0;
806  for (const auto& fragment_info : fragmentInfoVec_) {
807  numTuples_ += fragment_info->getPhysicalNumTuples();
808  }
810 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
mapd_shared_lock< mapd_shared_mutex > read_lock
void Fragmenter_Namespace::InsertOrderFragmenter::setLastFragmentVarLenColumnSizes ( )
private

Definition at line 812 of file InsertOrderFragmenter.cpp.

812  {
813  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
814  // Now need to get the insert buffers for each column - should be last
815  // fragment
816  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
817  // TODO: add accessor here for safe indexing
818  int deviceId =
819  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
820  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
821  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
822  insertKey.push_back(colIt->first); // column id
823  insertKey.push_back(lastFragmentId); // fragment id
824  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
825  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
826  if (varLenColInfoIt != varLenColInfo_.end()) {
827  varLenColInfoIt->second = colIt->second.getBuffer()->size();
828  }
829  }
830  }
831 }
std::vector< int > ChunkKey
Definition: types.h:37
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map,
std::optional< Data_Namespace::MemoryLevel memory_level 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 281 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, show_chunk(), VLOG, and logger::WARNING.

284  {
285  // synchronize concurrent accesses to fragmentInfoVec_
286  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
292  if (shard_ >= 0) {
293  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
294  }
295 
296  CHECK(cd);
297  const auto column_id = cd->columnId;
298  const auto col_itr = columnMap_.find(column_id);
299  CHECK(col_itr != columnMap_.end());
300 
301  for (auto const& fragment : fragmentInfoVec_) {
302  auto stats_itr = stats_map.find(fragment->fragmentId);
303  if (stats_itr != stats_map.end()) {
304  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
305  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
306  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
308  column_id,
309  fragment->fragmentId};
310  auto chunk = Chunk_NS::Chunk::getChunk(cd,
311  &catalog_->getDataMgr(),
312  chunk_key,
313  memory_level.value_or(defaultInsertLevel_),
314  0,
315  chunk_meta_it->second->numBytes,
316  chunk_meta_it->second->numElements);
317  auto buf = chunk->getBuffer();
318  CHECK(buf);
319  if (!buf->hasEncoder()) {
320  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
321  }
322  auto encoder = buf->getEncoder();
323 
324  auto chunk_stats = stats_itr->second;
325 
326  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
327  encoder->getMetadata(old_chunk_metadata);
328  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
329 
330  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
331  // Use the logical type to display data, since the encoding should be ignored
332  const auto logical_ti = cd->columnType.is_dict_encoded_string()
334  : get_logical_type_info(cd->columnType);
335  if (!didResetStats) {
336  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
337  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
338  << DatumToString(chunk_stats.max, logical_ti);
339  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
340  << DatumToString(chunk_stats.min, logical_ti);
341  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
342  continue; // move to next fragment
343  }
344 
345  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
346  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
347  << DatumToString(chunk_stats.max, logical_ti);
348  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
349  << DatumToString(chunk_stats.min, logical_ti);
350  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
351 
352  // Reset fragment metadata map and set buffer to dirty
353  auto new_metadata = std::make_shared<ChunkMetadata>();
354  // Run through fillChunkStats to ensure any transformations to the raw metadata
355  // values get applied (e.g. for date in days)
356  encoder->getMetadata(new_metadata);
357 
358  fragment->setChunkMetadata(column_id, new_metadata);
359  fragment->shadowChunkMetadataMap =
360  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
362  buf->setDirty();
363  }
364  } else {
365  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
366  << ", table " << physicalTableId_ << ", "
367  << ", column " << column_id;
368  }
369  }
370 }
std::vector< int > ChunkKey
Definition: types.h:37
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:388
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
#define LOG(tag)
Definition: Logger.h:203
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:931
std::string show_chunk(const ChunkKey &key)
Definition: types.h:86
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define CHECK(condition)
Definition: Logger.h:209
bool is_dict_encoded_string() const
Definition: sqltypes.h:541
SQLTypeInfo columnType
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:303

+ Here is the call graph for this function:

std::optional< ChunkUpdateStats > Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 633 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, CHECK_GT, Fragmenter_Namespace::ChunkUpdateStats::chunk, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), Fragmenter_Namespace::ChunkUpdateStats::fragment_rows_count, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::ChunkUpdateStats::new_values_stats, Fragmenter_Namespace::ChunkUpdateStats::old_values_stats, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, TableDescriptor::tableId, temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, Fragmenter_Namespace::anonymous_namespace{UpdelStorage.cpp}::update_metadata(), foreign_storage::update_stats(), updateColumnMetadata(), Fragmenter_Namespace::ChunkUpdateStats::updated_rows_count, NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

642  {
643  updel_roll.catalog = catalog;
644  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
645  updel_roll.memoryLevel = memory_level;
646 
647  const size_t ncore = cpu_threads();
648  const auto nrow = frag_offsets.size();
649  const auto n_rhs_values = rhs_values.size();
650  if (0 == nrow) {
651  return {};
652  }
653  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
654 
655  auto fragment_ptr = getFragmentInfo(fragment_id);
656  auto& fragment = *fragment_ptr;
657  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
658  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
659  ChunkKey chunk_key{
660  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
661  auto chunk = Chunk_NS::Chunk::getChunk(cd,
662  &catalog->getDataMgr(),
663  chunk_key,
665  0,
666  chunk_meta_it->second->numBytes,
667  chunk_meta_it->second->numElements);
668 
669  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
670 
671  // parallel update elements
672  std::vector<std::future<void>> threads;
673 
674  const auto segsz = (nrow + ncore - 1) / ncore;
675  auto dbuf = chunk->getBuffer();
676  auto dbuf_addr = dbuf->getMemoryPtr();
677  dbuf->setUpdated();
678  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
679  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
680  threads.emplace_back(std::async(
681  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
682  SQLTypeInfo lhs_type = cd->columnType;
683 
684  // !! not sure if this is a undocumented convention or a bug, but for a sharded
685  // table the dictionary id of a encoded string column is not specified by
686  // comp_param in physical table but somehow in logical table :) comp_param in
687  // physical table is always 0, so need to adapt accordingly...
688  auto cdl = (shard_ < 0)
689  ? cd
690  : catalog->getMetadataForColumn(
691  catalog->getLogicalTableId(td->tableId), cd->columnId);
692  CHECK(cdl);
693  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
694  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
695  lhs_type, &decimalOverflowValidator);
696  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
697  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
698  lhs_type, &dateDaysOverflowValidator);
699 
700  StringDictionary* stringDict{nullptr};
701  if (lhs_type.is_string()) {
702  CHECK(kENCODING_DICT == lhs_type.get_compression());
703  auto dictDesc = const_cast<DictDescriptor*>(
704  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
705  CHECK(dictDesc);
706  stringDict = dictDesc->stringDict.get();
707  CHECK(stringDict);
708  }
709 
710  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
711  const auto roffs = frag_offsets[r];
712  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
713  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
714  ScalarTargetValue sv2;
715 
716  // Subtle here is on the two cases of string-to-string assignments, when
717  // upstream passes RHS string as a string index instead of a preferred "real
718  // string".
719  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
720  // index
721  // in this layer, so if upstream passes a str idx here, an
722  // exception is thrown.
723  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
724  // str idx.
725  if (rhs_type.is_string()) {
726  if (const auto vp = boost::get<int64_t>(sv)) {
727  auto dictDesc = const_cast<DictDescriptor*>(
728  catalog->getMetadataForDict(rhs_type.get_comp_param()));
729  if (nullptr == dictDesc) {
730  throw std::runtime_error(
731  "UPDATE does not support cast from string literal to string "
732  "column.");
733  }
734  auto stringDict = dictDesc->stringDict.get();
735  CHECK(stringDict);
736  sv2 = NullableString(stringDict->getString(*vp));
737  sv = &sv2;
738  }
739  }
740 
741  if (const auto vp = boost::get<int64_t>(sv)) {
742  auto v = *vp;
743  if (lhs_type.is_string()) {
744  throw std::runtime_error("UPDATE does not support cast to string.");
745  }
746  int64_t old_val;
747  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
748  // Handle special case where date column with date in days encoding stores
749  // metadata in epoch seconds.
750  if (lhs_type.is_date_in_days()) {
752  }
753  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
754  if (lhs_type.is_decimal()) {
755  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
756  int64_t decimal_val;
757  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
758  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
759  lhs_type.get_notnull() == false)
760  ? v
761  : decimal_val;
763  lhs_type, update_stats_per_thread[c], target_value, old_val);
764  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
765  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
766  if (positive_v_and_negative_d || negative_v_and_positive_d) {
767  throw std::runtime_error(
768  "Data conversion overflow on " + std::to_string(v) +
769  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
770  std::to_string(rhs_type.get_scale()) + ") to (" +
771  std::to_string(lhs_type.get_dimension()) + ", " +
772  std::to_string(lhs_type.get_scale()) + ")");
773  }
774  } else if (is_integral(lhs_type)) {
775  if (lhs_type.is_date_in_days()) {
776  // Store meta values in seconds
777  if (lhs_type.get_size() == 2) {
778  nullAwareDateOverflowValidator.validate<int16_t>(v);
779  } else {
780  nullAwareDateOverflowValidator.validate<int32_t>(v);
781  }
782  int64_t days;
783  get_scalar<int64_t>(data_ptr, lhs_type, days);
784  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
785  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
786  lhs_type.get_notnull() == false)
787  ? NullSentinelSupplier()(lhs_type, v)
788  : seconds;
790  lhs_type, update_stats_per_thread[c], target_value, old_val);
791  } else {
792  int64_t target_value;
793  if (rhs_type.is_decimal()) {
794  target_value = round(decimal_to_double(rhs_type, v));
795  } else {
796  target_value = v;
797  }
799  lhs_type, update_stats_per_thread[c], target_value, old_val);
800  }
801  } else {
802  if (rhs_type.is_decimal()) {
803  update_metadata(lhs_type,
804  update_stats_per_thread[c],
805  decimal_to_double(rhs_type, v),
806  double(old_val));
807  } else {
808  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
809  }
810  }
811  } else if (const auto vp = boost::get<double>(sv)) {
812  auto v = *vp;
813  if (lhs_type.is_string()) {
814  throw std::runtime_error("UPDATE does not support cast to string.");
815  }
816  double old_val;
817  get_scalar<double>(data_ptr, lhs_type, old_val);
818  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
819  if (lhs_type.is_integer()) {
821  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
822  } else if (lhs_type.is_fp()) {
824  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
825  } else {
826  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
827  "LHS with a floating RHS.";
828  }
829  } else if (const auto vp = boost::get<float>(sv)) {
830  auto v = *vp;
831  if (lhs_type.is_string()) {
832  throw std::runtime_error("UPDATE does not support cast to string.");
833  }
834  float old_val;
835  get_scalar<float>(data_ptr, lhs_type, old_val);
836  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
837  if (lhs_type.is_integer()) {
839  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
840  } else {
841  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
842  }
843  } else if (const auto vp = boost::get<NullableString>(sv)) {
844  const auto s = boost::get<std::string>(vp);
845  const auto sval = s ? *s : std::string("");
846  if (lhs_type.is_string()) {
847  decltype(stringDict->getOrAdd(sval)) sidx;
848  {
849  std::unique_lock<std::mutex> lock(temp_mutex_);
850  sidx = stringDict->getOrAdd(sval);
851  }
852  int64_t old_val;
853  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
854  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
856  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
857  } else if (sval.size() > 0) {
858  auto dval = std::atof(sval.data());
859  if (lhs_type.is_boolean()) {
860  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
861  } else if (lhs_type.is_time()) {
862  throw std::runtime_error(
863  "Date/Time/Timestamp update not supported through translated "
864  "string path.");
865  }
866  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
867  double old_val;
868  get_scalar<double>(data_ptr, lhs_type, old_val);
869  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
871  lhs_type, update_stats_per_thread[c], double(dval), old_val);
872  } else {
873  int64_t old_val;
874  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
875  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
877  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
878  }
879  } else {
880  put_null(data_ptr, lhs_type, cd->columnName);
881  update_stats_per_thread[c].new_values_stats.has_null = true;
882  }
883  } else {
884  CHECK(false);
885  }
886  }
887  }));
888  if (threads.size() >= (size_t)cpu_threads()) {
889  wait_cleanup_threads(threads);
890  }
891  }
892  wait_cleanup_threads(threads);
893 
894  // for unit test
896  if (cd->isDeletedCol) {
897  const auto deleted_offsets = getVacuumOffsets(chunk);
898  if (deleted_offsets.size() > 0) {
899  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
900  return {};
901  }
902  }
903  }
904  ChunkUpdateStats update_stats;
905  for (size_t c = 0; c < ncore; ++c) {
906  update_metadata(update_stats.new_values_stats,
907  update_stats_per_thread[c].new_values_stats);
908  update_metadata(update_stats.old_values_stats,
909  update_stats_per_thread[c].old_values_stats);
910  }
911 
912  CHECK_GT(fragment.shadowNumTuples, size_t(0));
914  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
915  update_stats.updated_rows_count = nrow;
916  update_stats.fragment_rows_count = fragment.shadowNumTuples;
917  update_stats.chunk = chunk;
918  return update_stats;
919 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:54
std::vector< int > ChunkKey
Definition: types.h:37
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
#define UNREACHABLE()
Definition: Logger.h:253
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:52
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4246
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1524
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:53
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:209
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:504
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:507
std::string columnName
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 54 of file UpdelStorage.cpp.

References updateColumn().

62  {
63  updateColumn(catalog,
64  td,
65  cd,
66  fragment_id,
67  frag_offsets,
68  std::vector<ScalarTargetValue>(1, rhs_value),
69  rhs_type,
70  memory_level,
71  updel_roll);
72 }
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnChunkMetadata ( const ColumnDescriptor cd,
const int  fragment_id,
const std::shared_ptr< ChunkMetadata metadata 
)
overridevirtual

Updates the metadata for a column chunk.

Parameters
cd- ColumnDescriptor for the column
fragment_id- Fragment id of the chunk within the column
metadata- shared_ptr of the metadata to update column chunk with

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 268 of file InsertOrderFragmenter.cpp.

References CHECK, and ColumnDescriptor::columnId.

271  {
272  // synchronize concurrent accesses to fragmentInfoVec_
273  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
274 
275  CHECK(metadata.get());
276  auto fragment_info = getFragmentInfo(fragment_id);
277  CHECK(fragment_info);
278  fragment_info->setChunkMetadata(cd->columnId, metadata);
279 }
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
#define CHECK(condition)
Definition: Logger.h:209
void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const UpdateValuesStats update_values_stats,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 921 of file UpdelStorage.cpp.

References UpdelRoll::catalog, ColumnDescriptor::columnId, ColumnDescriptor::columnType, fragmentInfoMutex_, UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getMetadataForTable(), Fragmenter_Namespace::UpdateValuesStats::has_null, SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, Fragmenter_Namespace::UpdateValuesStats::max_double, Fragmenter_Namespace::UpdateValuesStats::max_int64t, Fragmenter_Namespace::UpdateValuesStats::min_double, Fragmenter_Namespace::UpdateValuesStats::min_int64t, ColumnDescriptor::tableId, and foreign_storage::update_stats().

Referenced by compactRows(), and updateColumn().

927  {
928  mapd_unique_lock<mapd_shared_mutex> write_lock(fragmentInfoMutex_);
929  auto buffer = chunk->getBuffer();
930  const auto& lhs_type = cd->columnType;
931 
932  auto encoder = buffer->getEncoder();
933  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
934  static_assert(std::is_same<decltype(min), decltype(max)>::value,
935  "Type mismatch on min/max");
936  if (has_null) {
937  encoder->updateStats(decltype(min)(), true);
938  }
939  if (max < min) {
940  return;
941  }
942  encoder->updateStats(min, false);
943  encoder->updateStats(max, false);
944  };
945 
946  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
947  update_stats(new_values_stats.min_int64t,
948  new_values_stats.max_int64t,
949  new_values_stats.has_null);
950  } else if (lhs_type.is_fp()) {
951  update_stats(new_values_stats.min_double,
952  new_values_stats.max_double,
953  new_values_stats.has_null);
954  } else if (lhs_type.is_decimal()) {
955  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
956  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
957  new_values_stats.has_null);
958  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
959  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
960  update_stats(new_values_stats.min_int64t,
961  new_values_stats.max_int64t,
962  new_values_stats.has_null);
963  }
964  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
965  auto chunk_metadata =
966  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
967  buffer->getEncoder()->getMetadata(chunk_metadata);
968 }
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:52
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
mapd_unique_lock< mapd_shared_mutex > write_lock
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:507
bool is_integral(const SQLTypeInfo &t)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll,
Executor executor 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 267 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, checked_get(), Fragmenter_Namespace::InsertData::columnIds, cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, g_enable_experimental_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), i, insertDataNoCheckpoint(), Fragmenter_Namespace::InsertData::is_default, SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

277  {
278  updelRoll.is_varlen_update = true;
279  updelRoll.catalog = catalog;
280  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
281  updelRoll.memoryLevel = memoryLevel;
282 
283  size_t num_entries = sourceDataProvider.getEntryCount();
284  size_t num_rows = sourceDataProvider.getRowCount();
285 
286  if (0 == num_rows) {
287  // bail out early
288  return;
289  }
290 
292 
293  auto fragment_ptr = getFragmentInfo(fragmentId);
294  auto& fragment = *fragment_ptr;
295  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
296  get_chunks(catalog, td, fragment, memoryLevel, chunks);
297  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
298  columnDescriptors.size());
299  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
300  size_t indexOfDeletedColumn{0};
301  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
302  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
303  auto chunk = chunks[indexOfChunk];
304  const auto chunk_cd = chunk->getColumnDesc();
305 
306  if (chunk_cd->isDeletedCol) {
307  indexOfDeletedColumn = chunk_cd->columnId;
308  deletedChunk = chunk;
309  continue;
310  }
311 
312  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
313  columnDescriptors.end(),
314  [=](const ColumnDescriptor* cd) -> bool {
315  return cd->columnId == chunk_cd->columnId;
316  });
317 
318  if (targetColumnIt != columnDescriptors.end()) {
319  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
320 
321  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
322  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
323 
325  num_rows,
326  *catalog,
327  sourceDataMetaInfo,
328  targetDescriptor,
329  targetDescriptor->columnType,
330  !targetDescriptor->columnType.get_notnull(),
331  sourceDataProvider.getLiteralDictionary(),
333  ? executor->getStringDictionaryProxy(
334  sourceDataMetaInfo.get_type_info().get_comp_param(),
335  executor->getRowSetMemoryOwner(),
336  true)
337  : nullptr};
338  auto converter = factory.create(param);
339  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
340 
341  if (targetDescriptor->columnType.is_geometry()) {
342  // geometry columns are composites
343  // need to skip chunks, depending on geo type
344  switch (targetDescriptor->columnType.get_type()) {
345  case kMULTIPOLYGON:
346  indexOfChunk += 5;
347  break;
348  case kPOLYGON:
349  indexOfChunk += 4;
350  break;
351  case kLINESTRING:
352  indexOfChunk += 2;
353  break;
354  case kPOINT:
355  indexOfChunk += 1;
356  break;
357  default:
358  CHECK(false); // not supported
359  }
360  }
361  } else {
362  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
363  std::unique_ptr<ChunkToInsertDataConverter> converter;
364 
365  if (chunk_cd->columnType.is_fixlen_array()) {
366  converter =
367  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
368  } else if (chunk_cd->columnType.is_string()) {
369  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
370  } else if (chunk_cd->columnType.is_geometry()) {
371  // the logical geo column is a string column
372  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
373  } else {
374  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
375  }
376 
377  chunkConverters.push_back(std::move(converter));
378 
379  } else if (chunk_cd->columnType.is_date_in_days()) {
380  /* Q: Why do we need this?
381  A: In variable length updates path we move the chunk content of column
382  without decoding. Since it again passes through DateDaysEncoder
383  the expected value should be in seconds, but here it will be in days.
384  Therefore, using DateChunkConverter chunk values are being scaled to
385  seconds which then ultimately encoded in days in DateDaysEncoder.
386  */
387  std::unique_ptr<ChunkToInsertDataConverter> converter;
388  const size_t physical_size = chunk_cd->columnType.get_size();
389  if (physical_size == 2) {
390  converter =
391  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
392  } else if (physical_size == 4) {
393  converter =
394  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
395  } else {
396  CHECK(false);
397  }
398  chunkConverters.push_back(std::move(converter));
399  } else {
400  std::unique_ptr<ChunkToInsertDataConverter> converter;
401  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
402  int logical_size = logical_type.get_size();
403  int physical_size = chunk_cd->columnType.get_size();
404 
405  if (logical_type.is_string()) {
406  // for dicts -> logical = physical
407  logical_size = physical_size;
408  }
409 
410  if (8 == physical_size) {
411  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
412  num_rows, chunk.get());
413  } else if (4 == physical_size) {
414  if (8 == logical_size) {
415  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
416  num_rows, chunk.get());
417  } else {
418  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
419  num_rows, chunk.get());
420  }
421  } else if (2 == chunk_cd->columnType.get_size()) {
422  if (8 == logical_size) {
423  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
424  num_rows, chunk.get());
425  } else if (4 == logical_size) {
426  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
427  num_rows, chunk.get());
428  } else {
429  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
430  num_rows, chunk.get());
431  }
432  } else if (1 == chunk_cd->columnType.get_size()) {
433  if (8 == logical_size) {
434  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
435  num_rows, chunk.get());
436  } else if (4 == logical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
438  num_rows, chunk.get());
439  } else if (2 == logical_size) {
440  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
441  num_rows, chunk.get());
442  } else {
443  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
444  num_rows, chunk.get());
445  }
446  } else {
447  CHECK(false); // unknown
448  }
449 
450  chunkConverters.push_back(std::move(converter));
451  }
452  }
453  }
454 
455  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
456  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
457 
458  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
459  bool* deletedChunkBuffer =
460  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
461 
462  std::atomic<size_t> row_idx{0};
463 
464  auto row_converter = [&sourceDataProvider,
465  &sourceDataConverters,
466  &indexOffFragmentOffsetColumn,
467  &chunkConverters,
468  &deletedChunkBuffer,
469  &row_idx](size_t indexOfEntry) -> void {
470  // convert the source data
471  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
472  if (row.empty()) {
473  return;
474  }
475 
476  size_t indexOfRow = row_idx.fetch_add(1);
477 
478  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
479  if (sourceDataConverters[col]) {
480  const auto& mapd_variant = row[col];
481  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
482  }
483  }
484 
485  auto scalar = checked_get(
486  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
487  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
488 
489  // convert the remaining chunks
490  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
491  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
492  }
493 
494  // now mark the row as deleted
495  deletedChunkBuffer[indexInChunkBuffer] = true;
496  };
497 
498  bool can_go_parallel = num_rows > 20000;
499 
500  if (can_go_parallel) {
501  const size_t num_worker_threads = cpu_threads();
502  std::vector<std::future<void>> worker_threads;
503  for (size_t i = 0,
504  start_entry = 0,
505  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
506  i < num_worker_threads && start_entry < num_entries;
507  ++i, start_entry += stride) {
508  const auto end_entry = std::min(start_entry + stride, num_rows);
509  worker_threads.push_back(std::async(
511  [&row_converter](const size_t start, const size_t end) {
512  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
513  row_converter(indexOfRow);
514  }
515  },
516  start_entry,
517  end_entry));
518  }
519 
520  for (auto& child : worker_threads) {
521  child.wait();
522  }
523 
524  } else {
525  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
526  row_converter(entryIdx);
527  }
528  }
529 
531  insert_data.databaseId = catalog->getCurrentDB().dbId;
532  insert_data.tableId = td->tableId;
533 
534  for (size_t i = 0; i < chunkConverters.size(); i++) {
535  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
536  continue;
537  }
538 
539  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
540  if (sourceDataConverters[i]) {
541  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
542  }
543  continue;
544  }
545 
546  insert_data.numRows = num_rows;
547  insert_data.is_default.resize(insert_data.columnIds.size(), false);
548  insertDataNoCheckpoint(insert_data);
549 
550  // update metdata for deleted chunk as we are doing special handling
551  auto chunkMetadata =
552  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
553  chunkMetadata->chunkStats.max.boolval = 1;
554 
555  // Im not completely sure that we need to do this in fragmented and on the buffer
556  // but leaving this alone for now
557  if (!deletedChunk->getBuffer()->hasEncoder()) {
558  deletedChunk->initEncoder();
559  }
560  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
561 
562  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
563  // An append to the same fragment will increase shadowNumTuples.
564  // Update NumElems in this case. Otherwise, use existing NumElems.
565  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
566  }
567  deletedChunk->getBuffer()->setUpdated();
568 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:54
bool is_varlen_update
Definition: UpdelRoll.h:56
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:931
std::vector< bool > is_default
Definition: Fragmenter.h:66
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
future< Result > async(Fn &&fn, Args &&...args)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:52
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4246
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
bool g_enable_experimental_string_functions
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:53
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
#define CHECK(condition)
Definition: Logger.h:209
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
bool is_string() const
Definition: sqltypes.h:504
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:24
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 970 of file UpdelStorage.cpp.

References fragmentInfoMutex_, UpdelRoll::getChunkMetadataMap(), and UpdelRoll::getNumTuple().

972  {
973  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
974  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
975  auto& fragmentInfo = *key.second;
976  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
977  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
978  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
979  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
980 }
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
size_t getNumTuple(const MetaDataKey &key) const

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1074 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1077  {
1078  const auto cd = chunk->getColumnDesc();
1079  const auto& col_type = cd->columnType;
1080  auto data_buffer = chunk->getBuffer();
1081  auto data_addr = data_buffer->getMemoryPtr();
1082  auto element_size =
1083  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1084  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1085  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1086  size_t nbytes_fix_data_to_keep = 0;
1087  auto nrows_to_vacuum = frag_offsets.size();
1088  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1089  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1090  auto is_last_one = irow == nrows_to_vacuum;
1091  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1092  auto maddr_to_vacuum = data_addr;
1093  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1094  if (nrows_to_keep > 0) {
1095  auto nbytes_to_keep = nrows_to_keep * element_size;
1096  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1097  // move curr fixlen row block toward front
1098  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1099  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1100  nbytes_to_keep);
1101  }
1102  irow_of_blk_to_fill += nrows_to_keep;
1103  nbytes_fix_data_to_keep += nbytes_to_keep;
1104  }
1105  irow_of_blk_to_keep = irow_to_vacuum + 1;
1106  }
1107  return nbytes_fix_data_to_keep;
1108 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1188 of file UpdelStorage.cpp.

References CHECK, Fragmenter_Namespace::get_buffer_offset(), Fragmenter_Namespace::get_null_padding(), Fragmenter_Namespace::get_var_len_null_array_indexes(), Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples(), and i.

Referenced by compactRows().

1191  {
1192  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1193  auto data_buffer = chunk->getBuffer();
1194  CHECK(data_buffer);
1195  auto index_buffer = chunk->getIndexBuf();
1196  CHECK(index_buffer);
1197  auto data_addr = data_buffer->getMemoryPtr();
1198  auto indices_addr = index_buffer->getMemoryPtr();
1199  CHECK(indices_addr);
1200  auto index_array = (StringOffsetT*)indices_addr;
1201  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1202  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1203  size_t nbytes_fix_data_to_keep = 0;
1204  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1205  size_t null_padding =
1206  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1207  size_t nbytes_var_data_to_keep = null_padding;
1208  auto null_array_indexes = get_var_len_null_array_indexes(
1209  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1210  auto nrows_to_vacuum = frag_offsets.size();
1211  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1212  auto is_last_one = irow == nrows_to_vacuum;
1213  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1214  auto maddr_to_vacuum = data_addr;
1215  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1216  if (nrows_to_keep > 0) {
1217  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1218  auto deleted_row_start_offset =
1219  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1220  auto kept_row_start_offset =
1221  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1222  auto nbytes_to_keep =
1223  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1224  kept_row_start_offset;
1225  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1226  if (nbytes_to_keep > 0) {
1227  CHECK(data_addr);
1228  // move curr varlen row block toward front
1229  memmove(data_addr + ibyte_var_data_to_keep,
1230  data_addr + kept_row_start_offset,
1231  nbytes_to_keep);
1232  }
1233 
1234  const auto base_offset = kept_row_start_offset;
1235  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1236  auto update_index = irow_of_blk_to_keep + i;
1237  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1238  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1239  }
1240  }
1241  nbytes_var_data_to_keep += nbytes_to_keep;
1242  maddr_to_vacuum = indices_addr;
1243 
1244  constexpr static auto index_element_size = sizeof(StringOffsetT);
1245  nbytes_to_keep = nrows_to_keep * index_element_size;
1246  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1247  // move curr fixlen row block toward front
1248  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1249  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1250  nbytes_to_keep);
1251  }
1252  irow_of_blk_to_fill += nrows_to_keep;
1253  nbytes_fix_data_to_keep += nbytes_to_keep;
1254  }
1255  irow_of_blk_to_keep = irow_to_vacuum + 1;
1256  }
1257 
1258  // Set expected null padding, last offset, and negative values for null array offsets.
1259  index_array[0] = null_padding;
1260  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1261  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1262  if (!is_varlen_array) {
1263  CHECK(null_array_indexes.empty());
1264  }
1265  for (auto index : null_array_indexes) {
1266  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1267  }
1268  return nbytes_var_data_to_keep;
1269 }
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
int32_t StringOffsetT
Definition: sqltypes.h:957
#define CHECK(condition)
Definition: Logger.h:209
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 187 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 189 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 193 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 210 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 206 of file InsertOrderFragmenter.h.

Referenced by updateColumnMetadata(), and updateMetadata().

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 191 of file InsertOrderFragmenter.h.

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 212 of file InsertOrderFragmenter.h.

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 208 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 202 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 215 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 200 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 198 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 213 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 196 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 239 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 211 of file InsertOrderFragmenter.h.

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 214 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: