OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
size_t getNumFragments () override
 returns the number of fragments in a table More...
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insert_data_struct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertChunks (const InsertChunks &insert_chunk) override
 Insert chunks into minimal number of fragments. More...
 
void insertDataNoCheckpoint (InsertData &insert_data_struct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void insertChunksNoCheckpoint (const InsertChunks &insert_chunk) override
 Insert chunks into minimal number of fragments; no locks or checkpoints taken. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateColumnChunkMetadata (const ColumnDescriptor *cd, const int fragment_id, const std::shared_ptr< ChunkMetadata > metadata) override
 Updates the metadata for a column chunk. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map, std::optional< Data_Namespace::MemoryLevel > memory_level) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
std::optional< ChunkUpdateStatsupdateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
void resetSizesFromFragments () override
 
void alterNonGeoColumnType (const std::list< const ColumnDescriptor * > &columns)
 
void alterColumnGeoType (const std::list< std::pair< const ColumnDescriptor *, std::list< const ColumnDescriptor * >>> &src_dst_column_pairs)
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void conditionallyInstantiateFileMgrWithParams ()
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insert_data)
 
void insertChunksImpl (const InsertChunks &insert_chunk)
 
void addColumns (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::vector< std::unique_ptr
< Chunk_NS::Chunk > > 
tracked_in_memory_chunks_
 
std::deque< std::unique_ptr
< FragmentInfo > > 
fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
heavyai::shared_mutex fragmentInfoMutex_
 
heavyai::shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Private Member Functions

bool isAddingNewColumns (const InsertData &insert_data) const
 
void dropFragmentsToSizeNoInsertLock (const size_t max_rows)
 
void setLastFragmentVarLenColumnSizes ()
 
void insertChunksIntoFragment (const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, std::vector< size_t > &valid_row_indices, const size_t start_fragment)
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 54 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 98 of file InsertOrderFragmenter.cpp.

98 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::addColumns ( const InsertData insertDataStruct)
protected

Definition at line 832 of file InsertOrderFragmenter.cpp.

References CHECK, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

832  {
833  // synchronize concurrent accesses to fragmentInfoVec_
835  size_t numRowsLeft = insertDataStruct.numRows;
836  for (const auto columnId : insertDataStruct.columnIds) {
837  CHECK(columnMap_.end() == columnMap_.find(columnId));
838  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
839  CHECK(columnDesc);
840  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
841  }
842  try {
843  for (auto const& fragmentInfo : fragmentInfoVec_) {
844  fragmentInfo->shadowChunkMetadataMap =
845  fragmentInfo->getChunkMetadataMapPhysicalCopy();
846  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
847  size_t numRowsCanBeInserted;
848  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
849  auto columnId = insertDataStruct.columnIds[i];
850  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
851  CHECK(colDesc);
852  CHECK(columnMap_.find(columnId) != columnMap_.end());
853 
854  ChunkKey chunkKey = chunkKeyPrefix_;
855  chunkKey.push_back(columnId);
856  chunkKey.push_back(fragmentInfo->fragmentId);
857 
858  auto colMapIt = columnMap_.find(columnId);
859  auto& chunk = colMapIt->second;
860  if (chunk.isChunkOnDevice(
861  dataMgr_,
862  chunkKey,
864  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
865  dataMgr_->deleteChunksWithPrefix(chunkKey);
866  }
867  chunk.createChunkBuffer(
868  dataMgr_,
869  chunkKey,
871  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
872  chunk.initEncoder();
873 
874  try {
875  DataBlockPtr dataCopy = insertDataStruct.data[i];
876  auto size = colDesc->columnType.get_size();
877  if (0 > size) {
878  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
879  varLenColInfo_[columnId] = 0;
880  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
881  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
882  } else {
883  numRowsCanBeInserted = maxChunkSize_ / size;
884  }
885 
886  // FIXME: abort a case in which new column is wider than existing columns
887  if (numRowsCanBeInserted < numRowsToInsert) {
888  throw std::runtime_error("new column '" + colDesc->columnName +
889  "' wider than existing columns is not supported");
890  }
891 
892  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
893  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
894 
895  // update total size of var-len column in (actually the last) fragment
896  if (0 > size) {
897  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
898  varLenColInfo_[columnId] = chunk.getBuffer()->size();
899  }
900  } catch (...) {
901  dataMgr_->deleteChunksWithPrefix(chunkKey);
902  throw;
903  }
904  }
905  numRowsLeft -= numRowsToInsert;
906  }
907  CHECK(0 == numRowsLeft);
908  } catch (const std::exception& e) {
909  for (const auto columnId : insertDataStruct.columnIds) {
910  columnMap_.erase(columnId);
911  }
912  throw e;
913  }
914 
915  for (auto const& fragmentInfo : fragmentInfoVec_) {
916  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
917  }
918 }
std::vector< int > ChunkKey
Definition: types.h:36
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:522
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:291
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::alterColumnGeoType ( const std::list< std::pair< const ColumnDescriptor *, std::list< const ColumnDescriptor * >>> &  src_dst_column_pairs)

Definition at line 1378 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GE, and Data_Namespace::DISK_LEVEL.

1381  {
1384  << "`alterColumnTypeTransactional` only supported for regular tables";
1386 
1387  for (const auto& [src_cd, dst_columns] : src_dst_column_pairs) {
1388  auto logical_geo_column = *dst_columns.begin();
1389  CHECK(logical_geo_column->columnType.is_geometry());
1390 
1391  columnMap_.erase(
1392  src_cd->columnId); // NOTE: Necessary to prevent unpinning issues with these
1393  // chunks when fragmenter is destroyed later.
1394 
1395  for (const auto& fragment_info : fragmentInfoVec_) {
1396  int device_id = fragment_info->deviceIds[static_cast<int>(defaultInsertLevel_)];
1397  auto num_elements = fragment_info->chunkMetadataMap[src_cd->columnId]->numElements;
1398 
1399  CHECK_GE(dst_columns.size(), 1UL);
1400 
1401  std::list<const ColumnDescriptor*> columns = dst_columns;
1402  GeoAlterColumnContext alter_column_context{device_id,
1404  fragment_info.get(),
1405  src_cd,
1406  *dst_columns.begin(),
1407  num_elements,
1408  dataMgr_,
1409  catalog_,
1410  columnMap_,
1411  columns};
1412 
1413  alter_column_context.readSourceData();
1414 
1415  alter_column_context.createScratchBuffers();
1416 
1417  ScopeGuard delete_temp_chunk = [&] { alter_column_context.deleteScratchBuffers(); };
1418 
1419  const bool geo_validate_geometry = false;
1420  alter_column_context.encodeData(geo_validate_geometry);
1421 
1422  alter_column_context.putBuffersToDisk();
1423  }
1424  }
1425 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::unique_lock< T > unique_lock
#define CHECK(condition)
Definition: Logger.h:291
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::alterNonGeoColumnType ( const std::list< const ColumnDescriptor * > &  columns)

Definition at line 1427 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_EQ, ddl_utils::alter_column_utils::compare_column_descriptors(), and Data_Namespace::DISK_LEVEL.

1428  {
1431  << "`alterColumnTypeTransactional` only supported for regular tables";
1432 
1434 
1435  for (const auto dst_cd : columns) {
1436  auto col_it = columnMap_.find(dst_cd->columnId);
1437  CHECK(col_it != columnMap_.end());
1438 
1439  auto src_cd = col_it->second.getColumnDesc();
1440  CHECK_EQ(col_it->first, src_cd->columnId);
1441 
1443  .sql_types_match) {
1444  continue;
1445  }
1446 
1447  for (const auto& fragment_info : fragmentInfoVec_) {
1448  int device_id = fragment_info->deviceIds[static_cast<int>(defaultInsertLevel_)];
1449  auto num_elements = fragment_info->chunkMetadataMap[src_cd->columnId]->numElements;
1450 
1451  NonGeoAlterColumnContext alter_column_context{device_id,
1453  fragment_info.get(),
1454  src_cd,
1455  dst_cd,
1456  num_elements,
1457  dataMgr_,
1458  catalog_,
1459  columnMap_};
1460 
1461  alter_column_context.readSourceData();
1462 
1463  alter_column_context.createScratchBuffers();
1464 
1465  ScopeGuard delete_temp_chunk = [&] { alter_column_context.deleteScratchBuffers(); };
1466 
1467  alter_column_context.reencodeData();
1468 
1469  alter_column_context.putBuffersToDisk();
1470  }
1471  }
1472 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
CompareResult compare_column_descriptors(const ColumnDescriptor *lhs, const ColumnDescriptor *rhs)
Definition: DdlUtils.cpp:52
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unique_lock< T > unique_lock
#define CHECK(condition)
Definition: Logger.h:291
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1275 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), getChunksForAllColumns(), getFragmentInfo(), Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), UpdelRoll::setNumTuple(), report::stats, updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1280  {
1281  auto fragment_ptr = getFragmentInfo(fragment_id);
1282  auto& fragment = *fragment_ptr;
1283  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1284  const auto ncol = chunks.size();
1285 
1286  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1287 
1288  // parallel delete columns
1289  std::vector<std::future<void>> threads;
1290  auto nrows_to_vacuum = frag_offsets.size();
1291  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1292  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1293 
1294  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1295  auto chunk = chunks[ci];
1296  const auto cd = chunk->getColumnDesc();
1297  const auto& col_type = cd->columnType;
1298  auto data_buffer = chunk->getBuffer();
1299  auto index_buffer = chunk->getIndexBuf();
1300  auto data_addr = data_buffer->getMemoryPtr();
1301  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1302  auto index_array = (StringOffsetT*)indices_addr;
1303  bool is_varlen = col_type.is_varlen_indeed();
1304 
1305  auto fixlen_vacuum =
1306  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1307  size_t nbytes_fix_data_to_keep;
1308  if (nrows_to_keep == 0) {
1309  nbytes_fix_data_to_keep = 0;
1310  } else {
1311  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1312  }
1313 
1314  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1315  data_buffer->setSize(nbytes_fix_data_to_keep);
1316  data_buffer->setUpdated();
1317 
1318  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1319 
1320  auto daddr = data_addr;
1321  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1322  : get_element_size(col_type);
1323  data_buffer->getEncoder()->resetChunkStats();
1324  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1325  if (col_type.is_fixlen_array()) {
1326  auto encoder =
1327  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1328  CHECK(encoder);
1329  encoder->updateMetadata((int8_t*)daddr);
1330  } else if (col_type.is_fp()) {
1331  set_chunk_stats(col_type,
1332  daddr,
1333  update_stats_per_thread[ci].new_values_stats.has_null,
1334  update_stats_per_thread[ci].new_values_stats.min_double,
1335  update_stats_per_thread[ci].new_values_stats.max_double);
1336  } else {
1337  set_chunk_stats(col_type,
1338  daddr,
1339  update_stats_per_thread[ci].new_values_stats.has_null,
1340  update_stats_per_thread[ci].new_values_stats.min_int64t,
1341  update_stats_per_thread[ci].new_values_stats.max_int64t);
1342  }
1343  }
1344  };
1345 
1346  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1347  size_t nbytes_var_data_to_keep;
1348  if (nrows_to_keep == 0) {
1349  nbytes_var_data_to_keep = 0;
1350  } else {
1351  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1352  }
1353 
1354  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1355  data_buffer->setSize(nbytes_var_data_to_keep);
1356  data_buffer->setUpdated();
1357 
1358  index_buffer->setSize(sizeof(*index_array) *
1359  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1360  index_buffer->setUpdated();
1361 
1362  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1363  };
1364 
1365  if (is_varlen) {
1366  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1367  } else {
1368  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1369  }
1370  if (threads.size() >= (size_t)cpu_threads()) {
1371  wait_cleanup_threads(threads);
1372  }
1373  }
1374 
1375  wait_cleanup_threads(threads);
1376 
1377  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1378  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1379  auto chunk = chunks[ci];
1380  auto cd = chunk->getColumnDesc();
1381  if (!cd->columnType.is_fixlen_array()) {
1382  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1383  // stored in seconds. Do the metadata conversion here before updating the chunk
1384  // stats.
1385  if (cd->columnType.is_date_in_days()) {
1386  auto& stats = update_stats_per_thread[ci].new_values_stats;
1387  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1388  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1389  }
1391  fragment,
1392  chunk,
1393  update_stats_per_thread[ci].new_values_stats,
1394  cd->columnType,
1395  updel_roll);
1396  }
1397  }
1398 }
dictionary stats
Definition: report.py:116
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:1493
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
#define CHECK(condition)
Definition: Logger.h:291
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
int cpu_threads()
Definition: thread_count.h:25
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams ( )
protected

Definition at line 481 of file InsertOrderFragmenter.cpp.

References Data_Namespace::DISK_LEVEL, File_Namespace::FileMgrParams::max_rollback_epochs, and TableDescriptor::maxRollbackEpochs.

481  {
482  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
483  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
484  // storage per table
485  if (!uses_foreign_storage_ &&
487  const TableDescriptor* td =
488  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
489  File_Namespace::FileMgrParams fileMgrParams;
490  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
492  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
493  }
494 }
int32_t maxRollbackEpochs
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:649
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 1269 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::CPU_LEVEL, and Fragmenter_Namespace::FragmentInfo::fragmentId.

1270  {
1271  // also sets the new fragment as the insertBuffer for each column
1272 
1273  maxFragmentId_++;
1274  auto newFragmentInfo = std::make_unique<FragmentInfo>();
1275  newFragmentInfo->fragmentId = maxFragmentId_;
1276  newFragmentInfo->shadowNumTuples = 0;
1277  newFragmentInfo->setPhysicalNumTuples(0);
1278  for (const auto levelSize : dataMgr_->levelSizes_) {
1279  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
1280  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
1281  }
1282  newFragmentInfo->physicalTableId = physicalTableId_;
1283  newFragmentInfo->shard = shard_;
1284 
1285  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
1286  colMapIt != columnMap_.end();
1287  ++colMapIt) {
1288  auto& chunk = colMapIt->second;
1289  if (memoryLevel == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1290  /* At the end of this function chunks from the previous fragment become 'rolled
1291  * off', temporaray tables will lose reference to any 'rolled off' chunks and are
1292  * not able to unpin these chunks. Keep reference to 'rolled off' chunks and unpin
1293  * at ~InsertOrderFragmenter, chunks wrapped by unique_ptr to avoid extraneous
1294  * ~Chunk calls with temporary chunks.*/
1295  tracked_in_memory_chunks_.emplace_back(std::make_unique<Chunk_NS::Chunk>(chunk));
1296  }
1297  ChunkKey chunkKey = chunkKeyPrefix_;
1298  chunkKey.push_back(chunk.getColumnDesc()->columnId);
1299  chunkKey.push_back(maxFragmentId_);
1300  chunk.createChunkBuffer(dataMgr_,
1301  chunkKey,
1302  memoryLevel,
1303  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
1304  pageSize_);
1305  chunk.initEncoder();
1306  }
1307 
1309  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
1310  return fragmentInfoVec_.back().get();
1311 }
std::lock_guard< T > lock_guard
std::vector< int > ChunkKey
Definition: types.h:36
std::vector< int > levelSizes_
Definition: DataMgr.h:240
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::vector< std::unique_ptr< Chunk_NS::Chunk > > tracked_in_memory_chunks_
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 602 of file InsertOrderFragmenter.cpp.

References lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

602  {
603  // Fix a verified loophole on sharded logical table which is locked using logical
604  // tableId while it's its physical tables that can come here when fragments overflow
605  // during COPY. Locks on a logical table and its physical tables never intersect, which
606  // means potential races. It'll be an overkill to resolve a logical table to physical
607  // tables in DBHandler, ParseNode or other higher layers where the logical table is
608  // locked with Table Read/Write locks; it's easier to lock the logical table of its
609  // physical tables. A downside of this approach may be loss of parallel execution of
610  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
611  // operation, the loss seems not a big deal.
612  auto chunkKeyPrefix = chunkKeyPrefix_;
613  if (shard_ >= 0) {
614  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
615  }
616 
617  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
618  // SELECT and COPY may enter a deadlock
619  const auto delete_lock =
621 
623 
624  for (const auto fragId : dropFragIds) {
625  for (const auto& col : columnMap_) {
626  int colId = col.first;
627  vector<int> fragPrefix = chunkKeyPrefix_;
628  fragPrefix.push_back(colId);
629  fragPrefix.push_back(fragId);
630  dataMgr_->deleteChunksWithPrefix(fragPrefix);
631  }
632  }
633 }
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
std::unique_lock< T > unique_lock
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:5008
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:522
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 920 of file InsertOrderFragmenter.cpp.

920  {
921  // prevent concurrent insert rows and drop column
923  // synchronize concurrent accesses to fragmentInfoVec_
925  for (auto const& fragmentInfo : fragmentInfoVec_) {
926  fragmentInfo->shadowChunkMetadataMap =
927  fragmentInfo->getChunkMetadataMapPhysicalCopy();
928  }
929 
930  for (const auto columnId : columnIds) {
931  auto cit = columnMap_.find(columnId);
932  if (columnMap_.end() != cit) {
933  columnMap_.erase(cit);
934  }
935 
936  vector<int> fragPrefix = chunkKeyPrefix_;
937  fragPrefix.push_back(columnId);
938  dataMgr_->deleteChunksWithPrefix(fragPrefix);
939 
940  for (const auto& fragmentInfo : fragmentInfoVec_) {
941  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
942  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
943  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
944  }
945  }
946  }
947  for (const auto& fragmentInfo : fragmentInfoVec_) {
948  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
949  }
950 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unique_lock< T > unique_lock
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:522
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 569 of file InsertOrderFragmenter.cpp.

569  {
572 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
std::unique_lock< T > unique_lock
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock ( const size_t  max_rows)
private

Definition at line 574 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

574  {
575  // not safe to call from outside insertData
576  // b/c depends on insertLock around numTuples_
577 
578  // don't ever drop the only fragment!
579  if (fragmentInfoVec_.empty() ||
580  numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
581  return;
582  }
583 
584  if (numTuples_ > max_rows) {
585  size_t preNumTuples = numTuples_;
586  vector<int> dropFragIds;
587  size_t targetRows = max_rows * DROP_FRAGMENT_FACTOR;
588  while (numTuples_ > targetRows) {
589  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
590  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
591  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
592  fragmentInfoVec_.pop_front();
593  CHECK_GE(numTuples_, numFragTuples);
594  numTuples_ -= numFragTuples;
595  }
596  deleteFragments(dropFragIds);
597  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
598  << " post: " << numTuples_ << " maxRows: " << max_rows;
599  }
600 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:285
#define CHECK_GE(x, y)
Definition: Logger.h:306
#define CHECK_GT(x, y)
Definition: Logger.h:305
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 120 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 496 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GE, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, gpu_enabled::sort(), and to_string().

496  {
497  if (uses_foreign_storage_ ||
499  // memory-resident tables won't have anything on disk
500  ChunkMetadataVector chunk_metadata;
502 
503  // data comes like this - database_id, table_id, column_id, fragment_id
504  // but lets sort by database_id, table_id, fragment_id, column_id
505 
506  int fragment_subkey_index = 3;
507  std::sort(chunk_metadata.begin(),
508  chunk_metadata.end(),
509  [&](const auto& pair1, const auto& pair2) {
510  return pair1.first[3] < pair2.first[3];
511  });
512 
513  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
514  ++chunk_itr) {
515  int cur_column_id = chunk_itr->first[2];
516  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
517 
518  if (fragmentInfoVec_.empty() ||
519  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
520  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
521  CHECK(new_fragment_info);
522  maxFragmentId_ = cur_fragment_id;
523  new_fragment_info->fragmentId = cur_fragment_id;
524  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
525  numTuples_ += new_fragment_info->getPhysicalNumTuples();
526  for (const auto level_size : dataMgr_->levelSizes_) {
527  new_fragment_info->deviceIds.push_back(
528  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
529  }
530  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
531  new_fragment_info->physicalTableId = physicalTableId_;
532  new_fragment_info->shard = shard_;
533  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
534  } else {
535  if (chunk_itr->second->numElements !=
536  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
537  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
538  std::to_string(physicalTableId_) + ", Column " +
539  std::to_string(cur_column_id) + ". Fragment Tuples: " +
541  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
542  ", Chunk Tuples: " +
543  std::to_string(chunk_itr->second->numElements);
544  }
545  }
546  CHECK(fragmentInfoVec_.back().get());
547  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
548  }
549  }
550 
551  size_t maxFixedColSize = 0;
552 
553  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
554  auto size = colIt->second.getColumnDesc()->columnType.get_size();
555  if (size == -1) { // variable length
556  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
557  size = 8; // b/c we use this for string and array indices - gross to have magic
558  // number here
559  }
560  CHECK_GE(size, 0);
561  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
562  }
563 
564  // this is maximum number of rows assuming everything is fixed length
565  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
567 }
std::vector< int > levelSizes_
Definition: DataMgr.h:240
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:285
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:496
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:291
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 988 of file UpdelStorage.cpp.

References catalog_, CHECK, Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

991  {
992  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
993  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
994  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
995  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
996  ++ncol;
997  if (!cd->isVirtualCol) {
998  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
999  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1000  ChunkKey chunk_key{
1001  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1002  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1003  &catalog_->getDataMgr(),
1004  chunk_key,
1005  memory_level,
1006  0,
1007  chunk_meta_it->second->numBytes,
1008  chunk_meta_it->second->numElements);
1009  chunks.push_back(chunk);
1010  }
1011  }
1012  }
1013  return chunks;
1014 }
std::vector< int > ChunkKey
Definition: types.h:36
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:266
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:291
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 119 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

119 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 124 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 739 of file InsertOrderFragmenter.cpp.

References CHECK.

Referenced by compactRows(), updateColumn(), and updateColumns().

739  {
740  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
741  fragmentInfoVec_.end(),
742  [fragment_id](const auto& fragment) -> bool {
743  return fragment->fragmentId == fragment_id;
744  });
745  CHECK(fragment_it != fragmentInfoVec_.end());
746  return fragment_it->get();
747 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the caller graph for this function:

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected
TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1318 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

1318  {
1320  TableInfo queryInfo;
1321  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
1322  // right now we don't test predicate, so just return (copy of) all fragments
1323  bool fragmentsExist = false;
1324  if (fragmentInfoVec_.empty()) {
1325  // If we have no fragments add a dummy empty fragment to make the executor
1326  // not have separate logic for 0-row tables
1327  int maxFragmentId = 0;
1328  FragmentInfo emptyFragmentInfo;
1329  emptyFragmentInfo.fragmentId = maxFragmentId;
1330  emptyFragmentInfo.shadowNumTuples = 0;
1331  emptyFragmentInfo.setPhysicalNumTuples(0);
1332  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
1333  emptyFragmentInfo.physicalTableId = physicalTableId_;
1334  emptyFragmentInfo.shard = shard_;
1335  queryInfo.fragments.push_back(emptyFragmentInfo);
1336  } else {
1337  fragmentsExist = true;
1338  std::for_each(
1339  fragmentInfoVec_.begin(),
1340  fragmentInfoVec_.end(),
1341  [&queryInfo](const auto& fragment_owned_ptr) {
1342  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
1343  });
1344  }
1345  readLock.unlock();
1346  queryInfo.setPhysicalNumTuples(0);
1347  auto partIt = queryInfo.fragments.begin();
1348  if (fragmentsExist) {
1349  while (partIt != queryInfo.fragments.end()) {
1350  if (partIt->getPhysicalNumTuples() == 0) {
1351  // this means that a concurrent insert query inserted tuples into a new fragment
1352  // but when the query came in we didn't have this fragment. To make sure we
1353  // don't mess up the executor we delete this fragment from the metadatamap
1354  // (fixes earlier bug found 2015-05-08)
1355  partIt = queryInfo.fragments.erase(partIt);
1356  } else {
1357  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
1358  partIt->getPhysicalNumTuples());
1359  ++partIt;
1360  }
1361  }
1362  } else {
1363  // We added a dummy fragment and know the table is empty
1364  queryInfo.setPhysicalNumTuples(0);
1365  }
1366  return queryInfo;
1367 }
std::vector< int > levelSizes_
Definition: DataMgr.h:240
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumFragments ( )
overridevirtual

returns the number of fragments in a table

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1313 of file InsertOrderFragmenter.cpp.

1313  {
1315  return fragmentInfoVec_.size();
1316 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1017 of file UpdelStorage.cpp.

References threading_serial::async(), CHECK, cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1018  {
1019  const auto data_buffer = chunk->getBuffer();
1020  const auto data_addr = data_buffer->getMemoryPtr();
1021  const size_t nrows_in_chunk = data_buffer->size();
1022  const size_t ncore = cpu_threads();
1023  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1024  std::vector<std::vector<uint64_t>> deleted_offsets;
1025  deleted_offsets.resize(ncore);
1026  std::vector<std::future<void>> threads;
1027  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1028  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1029  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1030  const auto ithread = rbegin / segsz;
1031  CHECK(ithread < deleted_offsets.size());
1032  deleted_offsets[ithread].reserve(segsz);
1033  for (size_t r = rbegin; r < rend; ++r) {
1034  if (data_addr[r]) {
1035  deleted_offsets[ithread].push_back(r);
1036  }
1037  }
1038  }));
1039  }
1040  wait_cleanup_threads(threads);
1041  std::vector<uint64_t> all_deleted_offsets;
1042  for (size_t i = 0; i < ncore; ++i) {
1043  all_deleted_offsets.insert(
1044  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1045  }
1046  return all_deleted_offsets;
1047 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
#define CHECK(condition)
Definition: Logger.h:291
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 952 of file InsertOrderFragmenter.cpp.

References CHECK.

952  {
954 
955  for (auto const& fragment : fragmentInfoVec_) {
956  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
957  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
958  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
959  if (chunk_stats.max.tinyintval == 1) {
960  return true;
961  }
962  }
963  return false;
964 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
#define CHECK(condition)
Definition: Logger.h:291
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunks ( const InsertChunks insert_chunk)
overridevirtual

Insert chunks into minimal number of fragments.

Parameters
insert_chunk- the chunks to insert

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 764 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertChunks::db_id, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertChunks::table_id.

764  {
765  try {
766  // prevent two threads from trying to insert into the same table simultaneously
768  insertChunksImpl(insert_chunk);
769  if (defaultInsertLevel_ ==
770  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
772  chunkKeyPrefix_[0],
773  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
774  }
775  } catch (...) {
776  auto db_id = insert_chunk.db_id;
777  auto table_epochs = catalog_->getTableEpochs(db_id, insert_chunk.table_id);
778  // the statement below deletes *this* object!
779  // relying on exception propagation at this stage
780  // until we can sort this out in a cleaner fashion
781  catalog_->setTableEpochs(db_id, table_epochs);
782  throw;
783  }
784 }
void insertChunksImpl(const InsertChunks &insert_chunk)
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:584
std::unique_lock< T > unique_lock
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3849
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3821
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksImpl ( const InsertChunks insert_chunk)
protected

Definition at line 1036 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_EQ, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertChunks::chunks, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::get_num_rows_to_insert(), and Fragmenter_Namespace::InsertChunks::valid_row_indices.

1036  {
1037  std::optional<int> delete_column_id{std::nullopt};
1038  for (const auto& cit : columnMap_) {
1039  if (cit.second.getColumnDesc()->isDeletedCol) {
1040  delete_column_id = cit.second.getColumnDesc()->columnId;
1041  }
1042  }
1043 
1044  // verify that all chunks to be inserted have same number of rows, otherwise the input
1045  // data is malformed
1046  std::optional<size_t> num_rows{std::nullopt};
1047  for (const auto& [column_id, chunk] : insert_chunks.chunks) {
1048  auto buffer = chunk->getBuffer();
1049  CHECK(buffer);
1050  CHECK(buffer->hasEncoder());
1051  if (!num_rows.has_value()) {
1052  num_rows = buffer->getEncoder()->getNumElems();
1053  } else {
1054  CHECK_EQ(num_rows.value(), buffer->getEncoder()->getNumElems());
1055  }
1056  }
1057 
1058  auto valid_row_indices = insert_chunks.valid_row_indices;
1059  size_t num_rows_left = valid_row_indices.size();
1060  size_t num_rows_inserted = 0;
1061 
1062  if (num_rows_left == 0) {
1063  return;
1064  }
1065 
1066  FragmentInfo* current_fragment{nullptr};
1067 
1068  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
1069  // feels fragile
1070  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
1071  current_fragment = createNewFragment(defaultInsertLevel_);
1072  } else {
1073  current_fragment = fragmentInfoVec_.back().get();
1074  }
1075  CHECK(current_fragment);
1076 
1077  size_t start_fragment = fragmentInfoVec_.size() - 1;
1078 
1079  while (num_rows_left > 0) { // may have to create multiple fragments for bulk insert
1080  // loop until done inserting all rows
1081  CHECK_LE(current_fragment->shadowNumTuples, maxFragmentRows_);
1082  size_t rows_left_in_current_fragment =
1083  maxFragmentRows_ - current_fragment->shadowNumTuples;
1084  size_t num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
1085  num_rows_left,
1086  num_rows_inserted,
1088  maxChunkSize_,
1089  insert_chunks,
1090  columnMap_,
1091  valid_row_indices);
1092 
1093  if (rows_left_in_current_fragment == 0 || num_rows_to_insert == 0) {
1094  current_fragment = createNewFragment(defaultInsertLevel_);
1095  if (num_rows_inserted == 0) {
1096  start_fragment++;
1097  }
1098  rows_left_in_current_fragment = maxFragmentRows_;
1099  for (auto& varLenColInfoIt : varLenColInfo_) {
1100  varLenColInfoIt.second = 0; // reset byte counter
1101  }
1102  num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
1103  num_rows_left,
1104  num_rows_inserted,
1105  varLenColInfo_,
1106  maxChunkSize_,
1107  insert_chunks,
1108  columnMap_,
1109  valid_row_indices);
1110  }
1111 
1112  CHECK_GT(num_rows_to_insert, size_t(0)); // would put us into an endless loop as we'd
1113  // never be able to insert anything
1114 
1115  insertChunksIntoFragment(insert_chunks,
1116  delete_column_id,
1117  current_fragment,
1118  num_rows_to_insert,
1119  num_rows_inserted,
1120  num_rows_left,
1121  valid_row_indices,
1122  start_fragment);
1123  }
1124  numTuples_ += *num_rows;
1126 }
void insertChunksIntoFragment(const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, std::vector< size_t > &valid_row_indices, const size_t start_fragment)
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:305
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:291
std::map< int, Chunk_NS::Chunk > columnMap_
size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment, const size_t num_rows_left, const size_t num_rows_inserted, const std::unordered_map< int, size_t > &var_len_col_info, const size_t max_chunk_size, const InsertChunks &insert_chunks, std::map< int, Chunk_NS::Chunk > &column_map, const std::vector< size_t > &valid_row_indices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksIntoFragment ( const InsertChunks insert_chunks,
const std::optional< int >  delete_column_id,
FragmentInfo current_fragment,
const size_t  num_rows_to_insert,
size_t &  num_rows_inserted,
size_t &  num_rows_left,
std::vector< size_t > &  valid_row_indices,
const size_t  start_fragment 
)
private

Definition at line 966 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_EQ, CHECK_GE, CHECK_LE, Fragmenter_Namespace::InsertChunks::chunks, Fragmenter_Namespace::FragmentInfo::fragmentId, DataBlockPtr::numbersPtr, Fragmenter_Namespace::FragmentInfo::shadowChunkMetadataMap, and Fragmenter_Namespace::FragmentInfo::shadowNumTuples.

974  {
976  // for each column, append the data in the appropriate insert buffer
977  auto insert_row_indices = valid_row_indices;
978  CHECK_GE(insert_row_indices.size(), num_rows_to_insert);
979  insert_row_indices.erase(insert_row_indices.begin() + num_rows_to_insert,
980  insert_row_indices.end());
981  CHECK_EQ(insert_row_indices.size(), num_rows_to_insert);
982  for (auto& [column_id, chunk] : insert_chunks.chunks) {
983  auto col_map_it = columnMap_.find(column_id);
984  CHECK(col_map_it != columnMap_.end());
985  current_fragment->shadowChunkMetadataMap[column_id] =
986  col_map_it->second.appendEncodedDataAtIndices(*chunk, insert_row_indices);
987  auto var_len_col_info_it = varLenColInfo_.find(column_id);
988  if (var_len_col_info_it != varLenColInfo_.end()) {
989  var_len_col_info_it->second = col_map_it->second.getBuffer()->size();
990  CHECK_LE(var_len_col_info_it->second, maxChunkSize_);
991  }
992  }
993  if (hasMaterializedRowId_) {
994  size_t start_id = maxFragmentRows_ * current_fragment->fragmentId +
995  current_fragment->shadowNumTuples;
996  std::vector<int64_t> row_id_data(num_rows_to_insert);
997  for (size_t i = 0; i < num_rows_to_insert; ++i) {
998  row_id_data[i] = i + start_id;
999  }
1000  DataBlockPtr row_id_block;
1001  row_id_block.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.data());
1002  auto col_map_it = columnMap_.find(rowIdColId_);
1003  CHECK(col_map_it != columnMap_.end());
1004  current_fragment->shadowChunkMetadataMap[rowIdColId_] = col_map_it->second.appendData(
1005  row_id_block, num_rows_to_insert, num_rows_inserted);
1006  }
1007 
1008  if (delete_column_id) { // has delete column
1009  std::vector<int8_t> delete_data(num_rows_to_insert, false);
1010  DataBlockPtr delete_block;
1011  delete_block.numbersPtr = reinterpret_cast<int8_t*>(delete_data.data());
1012  auto col_map_it = columnMap_.find(*delete_column_id);
1013  CHECK(col_map_it != columnMap_.end());
1014  current_fragment->shadowChunkMetadataMap[*delete_column_id] =
1015  col_map_it->second.appendData(
1016  delete_block, num_rows_to_insert, num_rows_inserted);
1017  }
1018 
1019  current_fragment->shadowNumTuples =
1020  fragmentInfoVec_.back()->getPhysicalNumTuples() + num_rows_to_insert;
1021  num_rows_left -= num_rows_to_insert;
1022  num_rows_inserted += num_rows_to_insert;
1023  for (auto part_it = fragmentInfoVec_.begin() + start_fragment;
1024  part_it != fragmentInfoVec_.end();
1025  ++part_it) {
1026  auto fragment_ptr = part_it->get();
1027  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
1028  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
1029  }
1030 
1031  // truncate the first `num_rows_to_insert` rows in `valid_row_indices`
1032  valid_row_indices.erase(valid_row_indices.begin(),
1033  valid_row_indices.begin() + num_rows_to_insert);
1034 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::unique_lock< T > unique_lock
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:291
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:233
void Fragmenter_Namespace::InsertOrderFragmenter::insertChunksNoCheckpoint ( const InsertChunks insert_chunk)
overridevirtual

Insert chunks into minimal number of fragments; no locks or checkpoints taken.

Parameters
chunk- the chunks to insert

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 812 of file InsertOrderFragmenter.cpp.

812  {
813  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
815  insertMutex_); // prevent two threads from trying to insert into the same table
816  // simultaneously
817  insertChunksImpl(insert_chunk);
818 }
void insertChunksImpl(const InsertChunks &insert_chunk)
std::unique_lock< T > unique_lock
void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insert_data_struct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 786 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

786  {
787  try {
788  // prevent two threads from trying to insert into the same table simultaneously
790  if (!isAddingNewColumns(insert_data_struct)) {
791  insertDataImpl(insert_data_struct);
792  } else {
793  addColumns(insert_data_struct);
794  }
795  if (defaultInsertLevel_ ==
796  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
798  chunkKeyPrefix_[0],
799  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
800  }
801  } catch (...) {
802  auto table_epochs = catalog_->getTableEpochs(insert_data_struct.databaseId,
803  insert_data_struct.tableId);
804  // the statement below deletes *this* object!
805  // relying on exception propagation at this stage
806  // until we can sort this out in a cleaner fashion
807  catalog_->setTableEpochs(insert_data_struct.databaseId, table_epochs);
808  throw;
809  }
810 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:584
void addColumns(const InsertData &insertDataStruct)
std::unique_lock< T > unique_lock
bool isAddingNewColumns(const InsertData &insert_data) const
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3849
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3821

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insert_data)
protected

Definition at line 1128 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::InsertData::is_default, DataBlockPtr::numbersPtr, and Fragmenter_Namespace::InsertData::numRows.

1128  {
1129  // populate deleted system column if it should exist, as it will not come from client
1130  std::unique_ptr<int8_t[]> data_for_deleted_column;
1131  for (const auto& cit : columnMap_) {
1132  if (cit.second.getColumnDesc()->isDeletedCol) {
1133  data_for_deleted_column.reset(new int8_t[insert_data.numRows]);
1134  memset(data_for_deleted_column.get(), 0, insert_data.numRows);
1135  insert_data.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
1136  insert_data.columnIds.push_back(cit.second.getColumnDesc()->columnId);
1137  insert_data.is_default.push_back(false);
1138  break;
1139  }
1140  }
1141  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
1142  std::unordered_map<int, int> inverseInsertDataColIdMap;
1143  for (size_t insertId = 0; insertId < insert_data.columnIds.size(); ++insertId) {
1144  inverseInsertDataColIdMap.insert(
1145  std::make_pair(insert_data.columnIds[insertId], insertId));
1146  }
1147 
1148  size_t numRowsLeft = insert_data.numRows;
1149  size_t numRowsInserted = 0;
1150  vector<DataBlockPtr> dataCopy =
1151  insert_data.data; // bc append data will move ptr forward and this violates
1152  // constness of InsertData
1153  if (numRowsLeft <= 0) {
1154  return;
1155  }
1156 
1157  FragmentInfo* currentFragment{nullptr};
1158 
1159  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
1160  // feels fragile
1161  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
1162  currentFragment = createNewFragment(defaultInsertLevel_);
1163  } else {
1164  currentFragment = fragmentInfoVec_.back().get();
1165  }
1166  CHECK(currentFragment);
1167 
1168  size_t startFragment = fragmentInfoVec_.size() - 1;
1169 
1170  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
1171  // loop until done inserting all rows
1172  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
1173  size_t rowsLeftInCurrentFragment =
1174  maxFragmentRows_ - currentFragment->shadowNumTuples;
1175  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
1176  if (rowsLeftInCurrentFragment != 0) {
1177  for (auto& varLenColInfoIt : varLenColInfo_) {
1178  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
1179  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
1180  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
1181  if (insertIdIt != inverseInsertDataColIdMap.end()) {
1182  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
1183  numRowsToInsert = std::min(numRowsToInsert,
1184  colMapIt->second.getNumElemsForBytesInsertData(
1185  dataCopy[insertIdIt->second],
1186  numRowsToInsert,
1187  numRowsInserted,
1188  bytesLeft,
1189  insert_data.is_default[insertIdIt->second]));
1190  }
1191  }
1192  }
1193 
1194  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
1195  currentFragment = createNewFragment(defaultInsertLevel_);
1196  if (numRowsInserted == 0) {
1197  startFragment++;
1198  }
1199  rowsLeftInCurrentFragment = maxFragmentRows_;
1200  for (auto& varLenColInfoIt : varLenColInfo_) {
1201  varLenColInfoIt.second = 0; // reset byte counter
1202  }
1203  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
1204  for (auto& varLenColInfoIt : varLenColInfo_) {
1205  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
1206  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
1207  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
1208  if (insertIdIt != inverseInsertDataColIdMap.end()) {
1209  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
1210  numRowsToInsert = std::min(numRowsToInsert,
1211  colMapIt->second.getNumElemsForBytesInsertData(
1212  dataCopy[insertIdIt->second],
1213  numRowsToInsert,
1214  numRowsInserted,
1215  bytesLeft,
1216  insert_data.is_default[insertIdIt->second]));
1217  }
1218  }
1219  }
1220 
1221  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
1222  // never be able to insert anything
1223 
1224  {
1226  // for each column, append the data in the appropriate insert buffer
1227  for (size_t i = 0; i < insert_data.columnIds.size(); ++i) {
1228  int columnId = insert_data.columnIds[i];
1229  auto colMapIt = columnMap_.find(columnId);
1230  CHECK(colMapIt != columnMap_.end());
1231  currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
1232  dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.is_default[i]);
1233  auto varLenColInfoIt = varLenColInfo_.find(columnId);
1234  if (varLenColInfoIt != varLenColInfo_.end()) {
1235  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
1236  }
1237  }
1238  if (hasMaterializedRowId_) {
1239  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
1240  currentFragment->shadowNumTuples;
1241  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
1242  for (size_t i = 0; i < numRowsToInsert; ++i) {
1243  row_id_data[i] = i + startId;
1244  }
1245  DataBlockPtr rowIdBlock;
1246  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
1247  auto colMapIt = columnMap_.find(rowIdColId_);
1248  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
1249  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
1250  }
1251 
1252  currentFragment->shadowNumTuples =
1253  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
1254  numRowsLeft -= numRowsToInsert;
1255  numRowsInserted += numRowsToInsert;
1256  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
1257  partIt != fragmentInfoVec_.end();
1258  ++partIt) {
1259  auto fragment_ptr = partIt->get();
1260  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
1261  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
1262  }
1263  }
1264  }
1265  numTuples_ += insert_data.numRows;
1267 }
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::unique_lock< T > unique_lock
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:291
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:233
void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insert_data_struct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 820 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

820  {
821  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
823  insertMutex_); // prevent two threads from trying to insert into the same table
824  // simultaneously
825  if (!isAddingNewColumns(insert_data_struct)) {
826  insertDataImpl(insert_data_struct);
827  } else {
828  addColumns(insert_data_struct);
829  }
830 }
void addColumns(const InsertData &insertDataStruct)
std::unique_lock< T > unique_lock
bool isAddingNewColumns(const InsertData &insert_data) const

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::isAddingNewColumns ( const InsertData insert_data) const
private

Definition at line 749 of file InsertOrderFragmenter.cpp.

References CHECK, and Fragmenter_Namespace::InsertData::columnIds.

749  {
750  bool all_columns_already_exist = true, all_columns_are_new = true;
751  for (const auto column_id : insert_data.columnIds) {
752  if (columnMap_.find(column_id) == columnMap_.end()) {
753  all_columns_already_exist = false;
754  } else {
755  all_columns_are_new = false;
756  }
757  }
758  // only one should be TRUE
759  bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
760  CHECK(either_all_exist_or_all_new);
761  return all_columns_are_new;
762 }
#define CHECK(condition)
Definition: Logger.h:291
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::resetSizesFromFragments ( )
overridevirtual

Resets the fragmenter's size related metadata using the internal fragment info vector. This is typically done after operations, such as vacuuming, which can change fragment sizes.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1369 of file InsertOrderFragmenter.cpp.

1369  {
1371  numTuples_ = 0;
1372  for (const auto& fragment_info : fragmentInfoVec_) {
1373  numTuples_ += fragment_info->getPhysicalNumTuples();
1374  }
1376 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_lock< T > shared_lock
void Fragmenter_Namespace::InsertOrderFragmenter::setLastFragmentVarLenColumnSizes ( )
private

Definition at line 1474 of file InsertOrderFragmenter.cpp.

1474  {
1475  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
1476  // Now need to get the insert buffers for each column - should be last
1477  // fragment
1478  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
1479  // TODO: add accessor here for safe indexing
1480  int deviceId =
1481  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
1482  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
1483  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
1484  insertKey.push_back(colIt->first); // column id
1485  insertKey.push_back(lastFragmentId); // fragment id
1486  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
1487  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
1488  if (varLenColInfoIt != varLenColInfo_.end()) {
1489  varLenColInfoIt->second = colIt->second.getBuffer()->size();
1490  }
1491  }
1492  }
1493 }
std::vector< int > ChunkKey
Definition: types.h:36
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map,
std::optional< Data_Namespace::MemoryLevel memory_level 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 648 of file InsertOrderFragmenter.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, show_chunk(), VLOG, and logger::WARNING.

651  {
652  // synchronize concurrent accesses to fragmentInfoVec_
659  if (shard_ >= 0) {
660  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
661  }
662 
663  CHECK(cd);
664  const auto column_id = cd->columnId;
665  const auto col_itr = columnMap_.find(column_id);
666  CHECK(col_itr != columnMap_.end());
667 
668  for (auto const& fragment : fragmentInfoVec_) {
669  auto stats_itr = stats_map.find(fragment->fragmentId);
670  if (stats_itr != stats_map.end()) {
671  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
672  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
673  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
675  column_id,
676  fragment->fragmentId};
677  auto chunk = Chunk_NS::Chunk::getChunk(cd,
678  &catalog_->getDataMgr(),
679  chunk_key,
680  memory_level.value_or(defaultInsertLevel_),
681  0,
682  chunk_meta_it->second->numBytes,
683  chunk_meta_it->second->numElements);
684  auto buf = chunk->getBuffer();
685  CHECK(buf);
686  if (!buf->hasEncoder()) {
687  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
688  }
689  auto encoder = buf->getEncoder();
690 
691  auto chunk_stats = stats_itr->second;
692 
693  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
694  encoder->getMetadata(old_chunk_metadata);
695  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
696 
697  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
698  // Use the logical type to display data, since the encoding should be ignored
699  const auto logical_ti = cd->columnType.is_dict_encoded_string()
701  : get_logical_type_info(cd->columnType);
702  if (!didResetStats) {
703  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
704  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
705  << DatumToString(chunk_stats.max, logical_ti);
706  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
707  << DatumToString(chunk_stats.min, logical_ti);
708  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
709  continue; // move to next fragment
710  }
711 
712  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
713  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
714  << DatumToString(chunk_stats.max, logical_ti);
715  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
716  << DatumToString(chunk_stats.min, logical_ti);
717  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
718 
719  // Reset fragment metadata map and set buffer to dirty
720  auto new_metadata = std::make_shared<ChunkMetadata>();
721  // Run through fillChunkStats to ensure any transformations to the raw metadata
722  // values get applied (e.g. for date in days)
723  encoder->getMetadata(new_metadata);
724 
725  fragment->setChunkMetadata(column_id, new_metadata);
726  fragment->shadowChunkMetadataMap =
727  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
729  buf->setDirty();
730  }
731  } else {
732  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
733  << ", table " << physicalTableId_ << ", "
734  << ", column " << column_id;
735  }
736  }
737 }
std::vector< int > ChunkKey
Definition: types.h:36
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:460
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:266
#define LOG(tag)
Definition: Logger.h:285
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1470
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
std::unique_lock< T > unique_lock
#define CHECK(condition)
Definition: Logger.h:291
bool is_dict_encoded_string() const
Definition: sqltypes.h:641
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

std::optional< ChunkUpdateStats > Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 639 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, CHECK_GT, Fragmenter_Namespace::ChunkUpdateStats::chunk, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), Fragmenter_Namespace::ChunkUpdateStats::fragment_rows_count, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::ChunkUpdateStats::new_values_stats, Fragmenter_Namespace::ChunkUpdateStats::old_values_stats, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, TableDescriptor::tableId, temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, Fragmenter_Namespace::anonymous_namespace{UpdelStorage.cpp}::update_metadata(), foreign_storage::update_stats(), updateColumnMetadata(), Fragmenter_Namespace::ChunkUpdateStats::updated_rows_count, NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

648  {
649  updel_roll.catalog = catalog;
650  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
651  updel_roll.memoryLevel = memory_level;
652 
653  const size_t ncore = cpu_threads();
654  const auto nrow = frag_offsets.size();
655  const auto n_rhs_values = rhs_values.size();
656  if (0 == nrow) {
657  return {};
658  }
659  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
660 
661  auto fragment_ptr = getFragmentInfo(fragment_id);
662  auto& fragment = *fragment_ptr;
663  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
664  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
665  ChunkKey chunk_key{
666  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
667  auto chunk = Chunk_NS::Chunk::getChunk(cd,
668  &catalog->getDataMgr(),
669  chunk_key,
671  0,
672  chunk_meta_it->second->numBytes,
673  chunk_meta_it->second->numElements);
674 
675  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
676 
677  // parallel update elements
678  std::vector<std::future<void>> threads;
679 
680  const auto segsz = (nrow + ncore - 1) / ncore;
681  auto dbuf = chunk->getBuffer();
682  auto dbuf_addr = dbuf->getMemoryPtr();
683  dbuf->setUpdated();
684  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
685  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
686  threads.emplace_back(std::async(
687  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
688  SQLTypeInfo lhs_type = cd->columnType;
689 
690  // !! not sure if this is a undocumented convention or a bug, but for a sharded
691  // table the dictionary id of a encoded string column is not specified by
692  // comp_param in physical table but somehow in logical table :) comp_param in
693  // physical table is always 0, so need to adapt accordingly...
694  auto cdl = (shard_ < 0)
695  ? cd
696  : catalog->getMetadataForColumn(
697  catalog->getLogicalTableId(td->tableId), cd->columnId);
698  CHECK(cdl);
699  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
700  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
701  lhs_type, &decimalOverflowValidator);
702  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
703  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
704  lhs_type, &dateDaysOverflowValidator);
705 
706  StringDictionary* stringDict{nullptr};
707  if (lhs_type.is_string()) {
708  CHECK(kENCODING_DICT == lhs_type.get_compression());
709  auto dictDesc = const_cast<DictDescriptor*>(
710  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
711  CHECK(dictDesc);
712  stringDict = dictDesc->stringDict.get();
713  CHECK(stringDict);
714  }
715 
716  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
717  const auto roffs = frag_offsets[r];
718  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
719  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
720  ScalarTargetValue sv2;
721 
722  // Subtle here is on the two cases of string-to-string assignments, when
723  // upstream passes RHS string as a string index instead of a preferred "real
724  // string".
725  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
726  // index
727  // in this layer, so if upstream passes a str idx here, an
728  // exception is thrown.
729  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
730  // str idx.
731  if (rhs_type.is_string()) {
732  if (const auto vp = boost::get<int64_t>(sv)) {
733  auto dictDesc = const_cast<DictDescriptor*>(
734  catalog->getMetadataForDict(rhs_type.get_comp_param()));
735  if (nullptr == dictDesc) {
736  throw std::runtime_error(
737  "UPDATE does not support cast from string literal to string "
738  "column.");
739  }
740  auto stringDict = dictDesc->stringDict.get();
741  CHECK(stringDict);
742  sv2 = NullableString(stringDict->getString(*vp));
743  sv = &sv2;
744  }
745  }
746 
747  if (const auto vp = boost::get<int64_t>(sv)) {
748  auto v = *vp;
749  if (lhs_type.is_string()) {
750  throw std::runtime_error("UPDATE does not support cast to string.");
751  }
752  int64_t old_val;
753  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
754  // Handle special case where date column with date in days encoding stores
755  // metadata in epoch seconds.
756  if (lhs_type.is_date_in_days()) {
758  }
759  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
760  if (lhs_type.is_decimal()) {
761  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
762  int64_t decimal_val;
763  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
764  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
765  lhs_type.get_notnull() == false)
766  ? v
767  : decimal_val;
769  lhs_type, update_stats_per_thread[c], target_value, old_val);
770  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
771  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
772  if (positive_v_and_negative_d || negative_v_and_positive_d) {
773  throw std::runtime_error(
774  "Data conversion overflow on " + std::to_string(v) +
775  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
776  std::to_string(rhs_type.get_scale()) + ") to (" +
777  std::to_string(lhs_type.get_dimension()) + ", " +
778  std::to_string(lhs_type.get_scale()) + ")");
779  }
780  } else if (is_integral(lhs_type)) {
781  if (lhs_type.is_date_in_days()) {
782  // Store meta values in seconds
783  if (lhs_type.get_size() == 2) {
784  nullAwareDateOverflowValidator.validate<int16_t>(v);
785  } else {
786  nullAwareDateOverflowValidator.validate<int32_t>(v);
787  }
788  int64_t days;
789  get_scalar<int64_t>(data_ptr, lhs_type, days);
790  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
791  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
792  lhs_type.get_notnull() == false)
793  ? NullSentinelSupplier()(lhs_type, v)
794  : seconds;
796  lhs_type, update_stats_per_thread[c], target_value, old_val);
797  } else {
798  int64_t target_value;
799  if (rhs_type.is_decimal()) {
800  target_value = round(decimal_to_double(rhs_type, v));
801  } else {
802  target_value = v;
803  }
805  lhs_type, update_stats_per_thread[c], target_value, old_val);
806  }
807  } else {
808  if (rhs_type.is_decimal()) {
809  update_metadata(lhs_type,
810  update_stats_per_thread[c],
811  decimal_to_double(rhs_type, v),
812  double(old_val));
813  } else {
814  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
815  }
816  }
817  } else if (const auto vp = boost::get<double>(sv)) {
818  auto v = *vp;
819  if (lhs_type.is_string()) {
820  throw std::runtime_error("UPDATE does not support cast to string.");
821  }
822  double old_val;
823  get_scalar<double>(data_ptr, lhs_type, old_val);
824  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
825  if (lhs_type.is_integer()) {
827  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
828  } else if (lhs_type.is_fp()) {
830  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
831  } else {
832  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
833  "LHS with a floating RHS.";
834  }
835  } else if (const auto vp = boost::get<float>(sv)) {
836  auto v = *vp;
837  if (lhs_type.is_string()) {
838  throw std::runtime_error("UPDATE does not support cast to string.");
839  }
840  float old_val;
841  get_scalar<float>(data_ptr, lhs_type, old_val);
842  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
843  if (lhs_type.is_integer()) {
845  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
846  } else {
847  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
848  }
849  } else if (const auto vp = boost::get<NullableString>(sv)) {
850  const auto s = boost::get<std::string>(vp);
851  const auto sval = s ? *s : std::string("");
852  if (lhs_type.is_string()) {
853  decltype(stringDict->getOrAdd(sval)) sidx;
854  {
855  std::unique_lock<std::mutex> lock(temp_mutex_);
856  sidx = stringDict->getOrAdd(sval);
857  }
858  int64_t old_val;
859  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
860  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
862  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
863  } else if (sval.size() > 0) {
864  auto dval = std::atof(sval.data());
865  if (lhs_type.is_boolean()) {
866  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
867  } else if (lhs_type.is_time()) {
868  throw std::runtime_error(
869  "Date/Time/Timestamp update not supported through translated "
870  "string path.");
871  }
872  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
873  double old_val;
874  get_scalar<double>(data_ptr, lhs_type, old_val);
875  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
877  lhs_type, update_stats_per_thread[c], double(dval), old_val);
878  } else {
879  int64_t old_val;
880  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
881  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
883  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
884  }
885  } else {
886  put_null(data_ptr, lhs_type, cd->columnName);
887  update_stats_per_thread[c].new_values_stats.has_null = true;
888  }
889  } else {
890  CHECK(false);
891  }
892  }
893  }));
894  if (threads.size() >= (size_t)cpu_threads()) {
895  wait_cleanup_threads(threads);
896  }
897  }
898  wait_cleanup_threads(threads);
899 
900  // for unit test
902  if (cd->isDeletedCol) {
903  const auto deleted_offsets = getVacuumOffsets(chunk);
904  if (deleted_offsets.size() > 0) {
905  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
906  return {};
907  }
908  }
909  }
910  ChunkUpdateStats update_stats;
911  for (size_t c = 0; c < ncore; ++c) {
912  update_metadata(update_stats.new_values_stats,
913  update_stats_per_thread[c].new_values_stats);
914  update_metadata(update_stats.old_values_stats,
915  update_stats_per_thread[c].old_values_stats);
916  }
917 
918  CHECK_GT(fragment.shadowNumTuples, size_t(0));
920  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
921  update_stats.updated_rows_count = nrow;
922  update_stats.fragment_rows_count = fragment.shadowNumTuples;
923  update_stats.chunk = chunk;
924  return update_stats;
925 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
std::vector< int > ChunkKey
Definition: types.h:36
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:266
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
HOST DEVICE int get_scale() const
Definition: sqltypes.h:396
#define UNREACHABLE()
Definition: Logger.h:338
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:5008
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1904
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:393
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:179
int logicalTableId
Definition: UpdelRoll.h:54
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:402
#define CHECK(condition)
Definition: Logger.h:291
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:559
int cpu_threads()
Definition: thread_count.h:25
bool is_decimal() const
Definition: sqltypes.h:568
std::string columnName
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 55 of file UpdelStorage.cpp.

References updateColumn().

63  {
64  updateColumn(catalog,
65  td,
66  cd,
67  fragment_id,
68  frag_offsets,
69  std::vector<ScalarTargetValue>(1, rhs_value),
70  rhs_type,
71  memory_level,
72  updel_roll);
73 }
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnChunkMetadata ( const ColumnDescriptor cd,
const int  fragment_id,
const std::shared_ptr< ChunkMetadata metadata 
)
overridevirtual

Updates the metadata for a column chunk.

Parameters
cd- ColumnDescriptor for the column
fragment_id- Fragment id of the chunk within the column
metadata- shared_ptr of the metadata to update column chunk with

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 635 of file InsertOrderFragmenter.cpp.

References CHECK, and ColumnDescriptor::columnId.

638  {
639  // synchronize concurrent accesses to fragmentInfoVec_
641 
642  CHECK(metadata.get());
643  auto fragment_info = getFragmentInfo(fragment_id);
644  CHECK(fragment_info);
645  fragment_info->setChunkMetadata(cd->columnId, metadata);
646 }
std::unique_lock< T > unique_lock
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
#define CHECK(condition)
Definition: Logger.h:291
void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const UpdateValuesStats update_values_stats,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 927 of file UpdelStorage.cpp.

References UpdelRoll::catalog, ColumnDescriptor::columnId, ColumnDescriptor::columnType, fragmentInfoMutex_, UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getMetadataForTable(), Fragmenter_Namespace::UpdateValuesStats::has_null, SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, Fragmenter_Namespace::UpdateValuesStats::max_double, Fragmenter_Namespace::UpdateValuesStats::max_int64t, Fragmenter_Namespace::UpdateValuesStats::min_double, Fragmenter_Namespace::UpdateValuesStats::min_int64t, ColumnDescriptor::tableId, and foreign_storage::update_stats().

Referenced by compactRows(), and updateColumn().

933  {
935  auto buffer = chunk->getBuffer();
936  const auto& lhs_type = cd->columnType;
937 
938  auto encoder = buffer->getEncoder();
939  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
940  static_assert(std::is_same<decltype(min), decltype(max)>::value,
941  "Type mismatch on min/max");
942  if (has_null) {
943  encoder->updateStats(decltype(min)(), true);
944  }
945  if (max < min) {
946  return;
947  }
948  encoder->updateStats(min, false);
949  encoder->updateStats(max, false);
950  };
951 
952  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
953  update_stats(new_values_stats.min_int64t,
954  new_values_stats.max_int64t,
955  new_values_stats.has_null);
956  } else if (lhs_type.is_fp()) {
957  update_stats(new_values_stats.min_double,
958  new_values_stats.max_double,
959  new_values_stats.has_null);
960  } else if (lhs_type.is_decimal()) {
961  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
962  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
963  new_values_stats.has_null);
964  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
965  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
966  update_stats(new_values_stats.min_int64t,
967  new_values_stats.max_int64t,
968  new_values_stats.has_null);
969  }
970  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
971  auto chunk_metadata =
972  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
973  buffer->getEncoder()->getMetadata(chunk_metadata);
974 }
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
std::unique_lock< T > unique_lock
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:568
bool is_integral(const SQLTypeInfo &t)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll,
Executor executor 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 268 of file UpdelStorage.cpp.

References UpdelRoll::addDirtyChunk(), threading_serial::async(), UpdelRoll::catalog, CHECK, checked_get(), Fragmenter_Namespace::InsertData::columnIds, cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, g_enable_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), UpdelRoll::getChunkMetadata(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), Fragmenter_Namespace::InsertData::is_default, SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTILINESTRING, kMULTIPOINT, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

278  {
279  updelRoll.is_varlen_update = true;
280  updelRoll.catalog = catalog;
281  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
282  updelRoll.memoryLevel = memoryLevel;
283 
284  size_t num_entries = sourceDataProvider.getEntryCount();
285  size_t num_rows = sourceDataProvider.getRowCount();
286 
287  if (0 == num_rows) {
288  // bail out early
289  return;
290  }
291 
293 
294  auto fragment_ptr = getFragmentInfo(fragmentId);
295  auto& fragment = *fragment_ptr;
296  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
297  get_chunks(catalog, td, fragment, memoryLevel, chunks);
298  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
299  columnDescriptors.size());
300  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
301  size_t indexOfDeletedColumn{0};
302  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
303  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
304  auto chunk = chunks[indexOfChunk];
305  const auto chunk_cd = chunk->getColumnDesc();
306 
307  if (chunk_cd->isDeletedCol) {
308  indexOfDeletedColumn = chunk_cd->columnId;
309  deletedChunk = chunk;
310  continue;
311  }
312 
313  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
314  columnDescriptors.end(),
315  [=](const ColumnDescriptor* cd) -> bool {
316  return cd->columnId == chunk_cd->columnId;
317  });
318 
319  if (targetColumnIt != columnDescriptors.end()) {
320  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
321 
322  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
323  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
324 
326  num_rows,
327  sourceDataMetaInfo,
328  targetDescriptor,
329  *catalog,
330  targetDescriptor->columnType,
331  !targetDescriptor->columnType.get_notnull(),
332  sourceDataProvider.getLiteralDictionary(),
334  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
335  ? executor->getStringDictionaryProxy(
336  sourceDataMetaInfo.get_type_info().getStringDictKey(),
337  executor->getRowSetMemoryOwner(),
338  true)
339  : nullptr};
340  auto converter = factory.create(param);
341  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
342 
343  if (targetDescriptor->columnType.is_geometry()) {
344  // geometry columns are composites
345  // need to skip chunks, depending on geo type
346  switch (targetDescriptor->columnType.get_type()) {
347  case kMULTIPOLYGON:
348  indexOfChunk += 5;
349  break;
350  case kPOLYGON:
351  indexOfChunk += 4;
352  break;
353  case kMULTILINESTRING:
354  indexOfChunk += 3;
355  break;
356  case kLINESTRING:
357  case kMULTIPOINT:
358  indexOfChunk += 2;
359  break;
360  case kPOINT:
361  indexOfChunk += 1;
362  break;
363  default:
364  CHECK(false); // not supported
365  }
366  }
367  } else {
368  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
369  std::unique_ptr<ChunkToInsertDataConverter> converter;
370 
371  if (chunk_cd->columnType.is_fixlen_array()) {
372  converter =
373  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
374  } else if (chunk_cd->columnType.is_string()) {
375  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
376  } else if (chunk_cd->columnType.is_geometry()) {
377  // the logical geo column is a string column
378  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
379  } else {
380  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
381  }
382 
383  chunkConverters.push_back(std::move(converter));
384 
385  } else if (chunk_cd->columnType.is_date_in_days()) {
386  /* Q: Why do we need this?
387  A: In variable length updates path we move the chunk content of column
388  without decoding. Since it again passes through DateDaysEncoder
389  the expected value should be in seconds, but here it will be in days.
390  Therefore, using DateChunkConverter chunk values are being scaled to
391  seconds which then ultimately encoded in days in DateDaysEncoder.
392  */
393  std::unique_ptr<ChunkToInsertDataConverter> converter;
394  const size_t physical_size = chunk_cd->columnType.get_size();
395  if (physical_size == 2) {
396  converter =
397  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
398  } else if (physical_size == 4) {
399  converter =
400  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
401  } else {
402  CHECK(false);
403  }
404  chunkConverters.push_back(std::move(converter));
405  } else {
406  std::unique_ptr<ChunkToInsertDataConverter> converter;
407  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
408  int logical_size = logical_type.get_size();
409  int physical_size = chunk_cd->columnType.get_size();
410 
411  if (logical_type.is_string()) {
412  // for dicts -> logical = physical
413  logical_size = physical_size;
414  }
415 
416  if (8 == physical_size) {
417  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
418  num_rows, chunk.get());
419  } else if (4 == physical_size) {
420  if (8 == logical_size) {
421  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
422  num_rows, chunk.get());
423  } else {
424  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
425  num_rows, chunk.get());
426  }
427  } else if (2 == chunk_cd->columnType.get_size()) {
428  if (8 == logical_size) {
429  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
430  num_rows, chunk.get());
431  } else if (4 == logical_size) {
432  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
433  num_rows, chunk.get());
434  } else {
435  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
436  num_rows, chunk.get());
437  }
438  } else if (1 == chunk_cd->columnType.get_size()) {
439  if (8 == logical_size) {
440  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
441  num_rows, chunk.get());
442  } else if (4 == logical_size) {
443  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
444  num_rows, chunk.get());
445  } else if (2 == logical_size) {
446  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
447  num_rows, chunk.get());
448  } else {
449  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
450  num_rows, chunk.get());
451  }
452  } else {
453  CHECK(false); // unknown
454  }
455 
456  chunkConverters.push_back(std::move(converter));
457  }
458  }
459  }
460 
461  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
462  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
463 
464  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
465  bool* deletedChunkBuffer =
466  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
467 
468  std::atomic<size_t> row_idx{0};
469 
470  auto row_converter = [&sourceDataProvider,
471  &sourceDataConverters,
472  &indexOffFragmentOffsetColumn,
473  &chunkConverters,
474  &deletedChunkBuffer,
475  &row_idx](size_t indexOfEntry) -> void {
476  // convert the source data
477  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
478  if (row.empty()) {
479  return;
480  }
481 
482  size_t indexOfRow = row_idx.fetch_add(1);
483 
484  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
485  if (sourceDataConverters[col]) {
486  const auto& mapd_variant = row[col];
487  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
488  }
489  }
490 
491  auto scalar = checked_get(
492  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
493  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
494 
495  // convert the remaining chunks
496  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
497  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
498  }
499 
500  // now mark the row as deleted
501  deletedChunkBuffer[indexInChunkBuffer] = true;
502  };
503 
504  bool can_go_parallel = num_rows > 20000;
505 
506  if (can_go_parallel) {
507  const size_t num_worker_threads = cpu_threads();
508  std::vector<std::future<void>> worker_threads;
509  for (size_t i = 0,
510  start_entry = 0,
511  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
512  i < num_worker_threads && start_entry < num_entries;
513  ++i, start_entry += stride) {
514  const auto end_entry = std::min(start_entry + stride, num_rows);
515  worker_threads.push_back(std::async(
517  [&row_converter](const size_t start, const size_t end) {
518  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
519  row_converter(indexOfRow);
520  }
521  },
522  start_entry,
523  end_entry));
524  }
525 
526  for (auto& child : worker_threads) {
527  child.wait();
528  }
529 
530  } else {
531  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
532  row_converter(entryIdx);
533  }
534  }
535 
537  insert_data.databaseId = catalog->getCurrentDB().dbId;
538  insert_data.tableId = td->tableId;
539 
540  for (size_t i = 0; i < chunkConverters.size(); i++) {
541  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
542  continue;
543  }
544 
545  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
546  if (sourceDataConverters[i]) {
547  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
548  }
549  continue;
550  }
551 
552  insert_data.numRows = num_rows;
553  insert_data.is_default.resize(insert_data.columnIds.size(), false);
554  insertDataNoCheckpoint(insert_data);
555 
556  // update metdata for deleted chunk as we are doing special handling
557  auto chunkMetadata =
558  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
559  chunkMetadata->chunkStats.max.boolval = 1;
560 
561  // Im not completely sure that we need to do this in fragmented and on the buffer
562  // but leaving this alone for now
563  if (!deletedChunk->getBuffer()->hasEncoder()) {
564  deletedChunk->initEncoder();
565  }
566  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
567 
568  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
569  // An append to the same fragment will increase shadowNumTuples.
570  // Update NumElems in this case. Otherwise, use existing NumElems.
571  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
572  }
573  deletedChunk->getBuffer()->setUpdated();
574 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
bool is_varlen_update
Definition: UpdelRoll.h:57
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1470
std::vector< bool > is_default
Definition: Fragmenter.h:75
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
future< Result > async(Fn &&fn, Args &&...args)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:5008
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:54
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
#define CHECK(condition)
Definition: Logger.h:291
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
bool is_string() const
Definition: sqltypes.h:559
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:25
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 976 of file UpdelStorage.cpp.

References fragmentInfoMutex_, UpdelRoll::getChunkMetadataMap(), and UpdelRoll::getNumTuple().

978  {
980  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
981  auto& fragmentInfo = *key.second;
982  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
983  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
984  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
985  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
986 }
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
std::unique_lock< T > unique_lock
size_t getNumTuple(const MetaDataKey &key) const

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1080 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1083  {
1084  const auto cd = chunk->getColumnDesc();
1085  const auto& col_type = cd->columnType;
1086  auto data_buffer = chunk->getBuffer();
1087  auto data_addr = data_buffer->getMemoryPtr();
1088  auto element_size =
1089  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1090  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1091  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1092  size_t nbytes_fix_data_to_keep = 0;
1093  auto nrows_to_vacuum = frag_offsets.size();
1094  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1095  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1096  auto is_last_one = irow == nrows_to_vacuum;
1097  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1098  auto maddr_to_vacuum = data_addr;
1099  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1100  if (nrows_to_keep > 0) {
1101  auto nbytes_to_keep = nrows_to_keep * element_size;
1102  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1103  // move curr fixlen row block toward front
1104  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1105  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1106  nbytes_to_keep);
1107  }
1108  irow_of_blk_to_fill += nrows_to_keep;
1109  nbytes_fix_data_to_keep += nbytes_to_keep;
1110  }
1111  irow_of_blk_to_keep = irow_to_vacuum + 1;
1112  }
1113  return nbytes_fix_data_to_keep;
1114 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1194 of file UpdelStorage.cpp.

References CHECK, Fragmenter_Namespace::get_buffer_offset(), Fragmenter_Namespace::get_null_padding(), Fragmenter_Namespace::get_var_len_null_array_indexes(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1197  {
1198  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1199  auto data_buffer = chunk->getBuffer();
1200  CHECK(data_buffer);
1201  auto index_buffer = chunk->getIndexBuf();
1202  CHECK(index_buffer);
1203  auto data_addr = data_buffer->getMemoryPtr();
1204  auto indices_addr = index_buffer->getMemoryPtr();
1205  CHECK(indices_addr);
1206  auto index_array = (StringOffsetT*)indices_addr;
1207  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1208  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1209  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1210  size_t null_padding =
1211  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1212  size_t nbytes_var_data_to_keep = null_padding;
1213  auto null_array_indexes = get_var_len_null_array_indexes(
1214  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1215  auto nrows_to_vacuum = frag_offsets.size();
1216  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1217  auto is_last_one = irow == nrows_to_vacuum;
1218  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1219  auto maddr_to_vacuum = data_addr;
1220  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1221  if (nrows_to_keep > 0) {
1222  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1223  auto deleted_row_start_offset =
1224  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1225  auto kept_row_start_offset =
1226  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1227  auto nbytes_to_keep =
1228  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1229  kept_row_start_offset;
1230  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1231  if (nbytes_to_keep > 0) {
1232  CHECK(data_addr);
1233  // move curr varlen row block toward front
1234  memmove(data_addr + ibyte_var_data_to_keep,
1235  data_addr + kept_row_start_offset,
1236  nbytes_to_keep);
1237  }
1238 
1239  const auto base_offset = kept_row_start_offset;
1240  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1241  auto update_index = irow_of_blk_to_keep + i;
1242  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1243  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1244  }
1245  }
1246  nbytes_var_data_to_keep += nbytes_to_keep;
1247  maddr_to_vacuum = indices_addr;
1248 
1249  constexpr static auto index_element_size = sizeof(StringOffsetT);
1250  nbytes_to_keep = nrows_to_keep * index_element_size;
1251  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1252  // move curr fixlen row block toward front
1253  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1254  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1255  nbytes_to_keep);
1256  }
1257  irow_of_blk_to_fill += nrows_to_keep;
1258  }
1259  irow_of_blk_to_keep = irow_to_vacuum + 1;
1260  }
1261 
1262  // Set expected null padding, last offset, and negative values for null array offsets.
1263  index_array[0] = null_padding;
1264  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1265  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1266  if (!is_varlen_array) {
1267  CHECK(null_array_indexes.empty());
1268  }
1269  for (auto index : null_array_indexes) {
1270  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1271  }
1272  return nbytes_var_data_to_keep;
1273 }
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
int32_t StringOffsetT
Definition: sqltypes.h:1493
#define CHECK(condition)
Definition: Logger.h:291
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 201 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 206 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 223 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 217 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

heavyai::shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 219 of file InsertOrderFragmenter.h.

Referenced by updateColumnMetadata(), and updateMetadata().

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 204 of file InsertOrderFragmenter.h.

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 225 of file InsertOrderFragmenter.h.

heavyai::shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 221 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 215 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 214 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 210 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 216 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 228 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 213 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 211 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 226 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 209 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 253 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::vector<std::unique_ptr<Chunk_NS::Chunk> > Fragmenter_Namespace::InsertOrderFragmenter::tracked_in_memory_chunks_
protected

Definition at line 202 of file InsertOrderFragmenter.h.

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 224 of file InsertOrderFragmenter.h.

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 227 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: