OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insertDataStruct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertDataNoCheckpoint (InsertData &insertDataStruct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateColumnChunkMetadata (const ColumnDescriptor *cd, const int fragment_id, const std::shared_ptr< ChunkMetadata > metadata) override
 Updates the metadata for a column chunk. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void conditionallyInstantiateFileMgrWithParams ()
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insertDataStruct)
 
void replicateData (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< std::unique_ptr
< FragmentInfo > > 
fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 52 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 94 of file InsertOrderFragmenter.cpp.

94 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1159 of file UpdelStorage.cpp.

References CHECK, cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), getChunksForAllColumns(), getFragmentInfo(), UpdelRoll::numTuples, Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1164  {
1165  auto fragment_ptr = getFragmentInfo(fragment_id);
1166  auto& fragment = *fragment_ptr;
1167  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1168  const auto ncol = chunks.size();
1169 
1170  std::vector<int8_t> has_null_per_thread(ncol, 0);
1171  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1172  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1173  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1174  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1175 
1176  // parallel delete columns
1177  std::vector<std::future<void>> threads;
1178  auto nrows_to_vacuum = frag_offsets.size();
1179  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1180  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1181 
1182  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1183  auto chunk = chunks[ci];
1184  const auto cd = chunk->getColumnDesc();
1185  const auto& col_type = cd->columnType;
1186  auto data_buffer = chunk->getBuffer();
1187  auto index_buffer = chunk->getIndexBuf();
1188  auto data_addr = data_buffer->getMemoryPtr();
1189  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1190  auto index_array = (StringOffsetT*)indices_addr;
1191  bool is_varlen = col_type.is_varlen_indeed();
1192 
1193  auto fixlen_vacuum = [=,
1194  &has_null_per_thread,
1195  &max_double_per_thread,
1196  &min_double_per_thread,
1197  &min_int64t_per_thread,
1198  &max_int64t_per_thread,
1199  &updel_roll,
1200  &frag_offsets,
1201  &fragment] {
1202  size_t nbytes_fix_data_to_keep;
1203  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1204 
1205  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1206  data_buffer->setSize(nbytes_fix_data_to_keep);
1207  data_buffer->setUpdated();
1208 
1209  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1210 
1211  auto daddr = data_addr;
1212  auto element_size =
1213  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1214  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1215  if (col_type.is_fixlen_array()) {
1216  auto encoder =
1217  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1218  CHECK(encoder);
1219  encoder->updateMetadata((int8_t*)daddr);
1220  } else if (col_type.is_fp()) {
1221  set_chunk_stats(col_type,
1222  data_addr,
1223  has_null_per_thread[ci],
1224  min_double_per_thread[ci],
1225  max_double_per_thread[ci]);
1226  } else {
1227  set_chunk_stats(col_type,
1228  data_addr,
1229  has_null_per_thread[ci],
1230  min_int64t_per_thread[ci],
1231  max_int64t_per_thread[ci]);
1232  }
1233  }
1234  };
1235 
1236  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1237  size_t nbytes_var_data_to_keep;
1238  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1239 
1240  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1241  data_buffer->setSize(nbytes_var_data_to_keep);
1242  data_buffer->setUpdated();
1243 
1244  index_array[nrows_to_keep] = data_buffer->size();
1245  index_buffer->setSize(sizeof(*index_array) *
1246  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1247  index_buffer->setUpdated();
1248 
1249  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1250  };
1251 
1252  if (is_varlen) {
1253  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1254  } else {
1255  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1256  }
1257  if (threads.size() >= (size_t)cpu_threads()) {
1258  wait_cleanup_threads(threads);
1259  }
1260  }
1261 
1262  wait_cleanup_threads(threads);
1263 
1264  auto key = std::make_pair(td, &fragment);
1265  updel_roll.numTuples[key] = nrows_to_keep;
1266  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1267  auto chunk = chunks[ci];
1268  auto cd = chunk->getColumnDesc();
1269  if (!cd->columnType.is_fixlen_array()) {
1271  fragment,
1272  chunk,
1273  has_null_per_thread[ci],
1274  max_double_per_thread[ci],
1275  min_double_per_thread[ci],
1276  max_int64t_per_thread[ci],
1277  min_int64t_per_thread[ci],
1278  cd->columnType,
1279  updel_roll);
1280  }
1281  }
1282 }
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:919
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
#define CHECK(condition)
Definition: Logger.h:197
int cpu_threads()
Definition: thread_count.h:24
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams ( )
protected

Definition at line 114 of file InsertOrderFragmenter.cpp.

References catalog_(), DEFAULT_MAX_ROLLBACK_EPOCHS, Data_Namespace::DISK_LEVEL, File_Namespace::FileMgrParams::max_rollback_epochs, and TableDescriptor::maxRollbackEpochs.

114  {
115  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
116  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
117  // storage per table
119  const TableDescriptor* td =
120  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
122  File_Namespace::FileMgrParams fileMgrParams;
123  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
125  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
126  }
127  }
128 }
int32_t maxRollbackEpochs
#define DEFAULT_MAX_ROLLBACK_EPOCHS
specifies the content in-memory of a row in the table metadata table
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
Definition: DataMgr.cpp:528
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 708 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), and Fragmenter_Namespace::FragmentInfo::fragmentId.

709  {
710  // also sets the new fragment as the insertBuffer for each column
711 
712  maxFragmentId_++;
713  auto newFragmentInfo = std::make_unique<FragmentInfo>();
714  newFragmentInfo->fragmentId = maxFragmentId_;
715  newFragmentInfo->shadowNumTuples = 0;
716  newFragmentInfo->setPhysicalNumTuples(0);
717  for (const auto levelSize : dataMgr_->levelSizes_) {
718  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
719  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
720  }
721  newFragmentInfo->physicalTableId = physicalTableId_;
722  newFragmentInfo->shard = shard_;
723 
724  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
725  colMapIt != columnMap_.end();
726  ++colMapIt) {
727  ChunkKey chunkKey = chunkKeyPrefix_;
728  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
729  chunkKey.push_back(maxFragmentId_);
730  colMapIt->second.createChunkBuffer(
731  dataMgr_,
732  chunkKey,
733  memoryLevel,
734  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
735  pageSize_);
736  colMapIt->second.initEncoder();
737  }
738 
739  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
740  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
741  return fragmentInfoVec_.back().get();
742 }
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< int > levelSizes_
Definition: DataMgr.h:210
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 248 of file InsertOrderFragmenter.cpp.

References catalog_(), and lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

248  {
249  // Fix a verified loophole on sharded logical table which is locked using logical
250  // tableId while it's its physical tables that can come here when fragments overflow
251  // during COPY. Locks on a logical table and its physical tables never intersect, which
252  // means potential races. It'll be an overkill to resolve a logical table to physical
253  // tables in DBHandler, ParseNode or other higher layers where the logical table is
254  // locked with Table Read/Write locks; it's easier to lock the logical table of its
255  // physical tables. A downside of this approach may be loss of parallel execution of
256  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
257  // operation, the loss seems not a big deal.
258  auto chunkKeyPrefix = chunkKeyPrefix_;
259  if (shard_ >= 0) {
260  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
261  }
262 
263  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
264  // SELECT and COPY may enter a deadlock
265  const auto delete_lock =
267 
268  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
269 
270  for (const auto fragId : dropFragIds) {
271  for (const auto& col : columnMap_) {
272  int colId = col.first;
273  vector<int> fragPrefix = chunkKeyPrefix_;
274  fragPrefix.push_back(colId);
275  fragPrefix.push_back(fragId);
276  dataMgr_->deleteChunksWithPrefix(fragPrefix);
277  }
278  }
279 }
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3909
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:436
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 506 of file InsertOrderFragmenter.cpp.

506  {
507  // prevent concurrent insert rows and drop column
508  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
509  // synchronize concurrent accesses to fragmentInfoVec_
510  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
511  for (auto const& fragmentInfo : fragmentInfoVec_) {
512  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
513  }
514 
515  for (const auto columnId : columnIds) {
516  auto cit = columnMap_.find(columnId);
517  if (columnMap_.end() != cit) {
518  columnMap_.erase(cit);
519  }
520 
521  vector<int> fragPrefix = chunkKeyPrefix_;
522  fragPrefix.push_back(columnId);
523  dataMgr_->deleteChunksWithPrefix(fragPrefix);
524 
525  for (const auto& fragmentInfo : fragmentInfoVec_) {
526  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
527  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
528  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
529  }
530  }
531  }
532  for (const auto& fragmentInfo : fragmentInfoVec_) {
533  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
534  }
535 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:436
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 221 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

221  {
222  // not safe to call from outside insertData
223  // b/c depends on insertLock around numTuples_
224 
225  // don't ever drop the only fragment!
226  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
227  return;
228  }
229 
230  if (numTuples_ > maxRows) {
231  size_t preNumTuples = numTuples_;
232  vector<int> dropFragIds;
233  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
234  while (numTuples_ > targetRows) {
235  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
236  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
237  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
238  fragmentInfoVec_.pop_front();
239  CHECK_GE(numTuples_, numFragTuples);
240  numTuples_ -= numFragTuples;
241  }
242  deleteFragments(dropFragIds);
243  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
244  << " post: " << numTuples_ << " maxRows: " << maxRows;
245  }
246 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:188
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define CHECK_GT(x, y)
Definition: Logger.h:209
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 107 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 130 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GE, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, gpu_enabled::sort(), and to_string().

130  {
131  if (uses_foreign_storage_ ||
133  // memory-resident tables won't have anything on disk
134  ChunkMetadataVector chunk_metadata;
136 
137  // data comes like this - database_id, table_id, column_id, fragment_id
138  // but lets sort by database_id, table_id, fragment_id, column_id
139 
140  int fragment_subkey_index = 3;
141  std::sort(chunk_metadata.begin(),
142  chunk_metadata.end(),
143  [&](const auto& pair1, const auto& pair2) {
144  return pair1.first[3] < pair2.first[3];
145  });
146 
147  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
148  ++chunk_itr) {
149  int cur_column_id = chunk_itr->first[2];
150  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
151 
152  if (fragmentInfoVec_.empty() ||
153  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
154  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
155  CHECK(new_fragment_info);
156  maxFragmentId_ = cur_fragment_id;
157  new_fragment_info->fragmentId = cur_fragment_id;
158  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
159  numTuples_ += new_fragment_info->getPhysicalNumTuples();
160  for (const auto level_size : dataMgr_->levelSizes_) {
161  new_fragment_info->deviceIds.push_back(
162  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
163  }
164  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
165  new_fragment_info->physicalTableId = physicalTableId_;
166  new_fragment_info->shard = shard_;
167  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
168  } else {
169  if (chunk_itr->second->numElements !=
170  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
171  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
172  std::to_string(physicalTableId_) + ", Column " +
173  std::to_string(cur_column_id) + ". Fragment Tuples: " +
175  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
176  ", Chunk Tuples: " +
177  std::to_string(chunk_itr->second->numElements);
178  }
179  }
180  CHECK(fragmentInfoVec_.back().get());
181  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
182  }
183  }
184 
185  size_t maxFixedColSize = 0;
186 
187  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
188  auto size = colIt->second.getColumnDesc()->columnType.get_size();
189  if (size == -1) { // variable length
190  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
191  size = 8; // b/c we use this for string and array indices - gross to have magic
192  // number here
193  }
194  CHECK_GE(size, 0);
195  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
196  }
197 
198  // this is maximum number of rows assuming everything is fixed length
199  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
200 
201  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
202  // Now need to get the insert buffers for each column - should be last
203  // fragment
204  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
205  // TODO: add accessor here for safe indexing
206  int deviceId =
207  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
208  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
209  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
210  insertKey.push_back(colIt->first); // column id
211  insertKey.push_back(lastFragmentId); // fragment id
212  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
213  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
214  if (varLenColInfoIt != varLenColInfo_.end()) {
215  varLenColInfoIt->second = colIt->second.getBuffer()->size();
216  }
217  }
218  }
219 }
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< int > levelSizes_
Definition: DataMgr.h:210
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:188
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:411
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:197
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 969 of file UpdelStorage.cpp.

References catalog_, CHECK, Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

972  {
973  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
974  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
975  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
976  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
977  ++ncol;
978  if (!cd->isVirtualCol) {
979  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
980  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
981  ChunkKey chunk_key{
982  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
983  auto chunk = Chunk_NS::Chunk::getChunk(cd,
984  &catalog_->getDataMgr(),
985  chunk_key,
986  memory_level,
987  0,
988  chunk_meta_it->second->numBytes,
989  chunk_meta_it->second->numElements);
990  chunks.push_back(chunk);
991  }
992  }
993  }
994  return chunks;
995 }
std::vector< int > ChunkKey
Definition: types.h:37
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 106 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

106 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 111 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 384 of file InsertOrderFragmenter.cpp.

References CHECK.

Referenced by compactRows(), updateColumn(), and updateColumns().

384  {
385  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
386  fragmentInfoVec_.end(),
387  [fragment_id](const auto& fragment) -> bool {
388  return fragment->fragmentId == fragment_id;
389  });
390  CHECK(fragment_it != fragmentInfoVec_.end());
391  return fragment_it->get();
392 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected
TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 744 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

744  {
745  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
746  TableInfo queryInfo;
747  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
748  // right now we don't test predicate, so just return (copy of) all fragments
749  bool fragmentsExist = false;
750  if (fragmentInfoVec_.empty()) {
751  // If we have no fragments add a dummy empty fragment to make the executor
752  // not have separate logic for 0-row tables
753  int maxFragmentId = 0;
754  FragmentInfo emptyFragmentInfo;
755  emptyFragmentInfo.fragmentId = maxFragmentId;
756  emptyFragmentInfo.shadowNumTuples = 0;
757  emptyFragmentInfo.setPhysicalNumTuples(0);
758  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
759  emptyFragmentInfo.physicalTableId = physicalTableId_;
760  emptyFragmentInfo.shard = shard_;
761  queryInfo.fragments.push_back(emptyFragmentInfo);
762  } else {
763  fragmentsExist = true;
764  std::for_each(
765  fragmentInfoVec_.begin(),
766  fragmentInfoVec_.end(),
767  [&queryInfo](const auto& fragment_owned_ptr) {
768  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
769  });
770  }
771  readLock.unlock();
772  queryInfo.setPhysicalNumTuples(0);
773  auto partIt = queryInfo.fragments.begin();
774  if (fragmentsExist) {
775  while (partIt != queryInfo.fragments.end()) {
776  if (partIt->getPhysicalNumTuples() == 0) {
777  // this means that a concurrent insert query inserted tuples into a new fragment
778  // but when the query came in we didn't have this fragment. To make sure we don't
779  // mess up the executor we delete this fragment from the metadatamap (fixes
780  // earlier bug found 2015-05-08)
781  partIt = queryInfo.fragments.erase(partIt);
782  } else {
783  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
784  partIt->getPhysicalNumTuples());
785  ++partIt;
786  }
787  }
788  } else {
789  // We added a dummy fragment and know the table is empty
790  queryInfo.setPhysicalNumTuples(0);
791  }
792  return queryInfo;
793 }
std::vector< int > levelSizes_
Definition: DataMgr.h:210
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 998 of file UpdelStorage.cpp.

References CHECK, cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

999  {
1000  const auto data_buffer = chunk->getBuffer();
1001  const auto data_addr = data_buffer->getMemoryPtr();
1002  const size_t nrows_in_chunk = data_buffer->size();
1003  const size_t ncore = cpu_threads();
1004  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1005  std::vector<std::vector<uint64_t>> deleted_offsets;
1006  deleted_offsets.resize(ncore);
1007  std::vector<std::future<void>> threads;
1008  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1009  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1010  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1011  const auto ithread = rbegin / segsz;
1012  CHECK(ithread < deleted_offsets.size());
1013  deleted_offsets[ithread].reserve(segsz);
1014  for (size_t r = rbegin; r < rend; ++r) {
1015  if (data_addr[r]) {
1016  deleted_offsets[ithread].push_back(r);
1017  }
1018  }
1019  }));
1020  }
1021  wait_cleanup_threads(threads);
1022  std::vector<uint64_t> all_deleted_offsets;
1023  for (size_t i = 0; i < ncore; ++i) {
1024  all_deleted_offsets.insert(
1025  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1026  }
1027  return all_deleted_offsets;
1028 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
#define CHECK(condition)
Definition: Logger.h:197
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 537 of file InsertOrderFragmenter.cpp.

References CHECK.

537  {
538  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
539 
540  for (auto const& fragment : fragmentInfoVec_) {
541  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
542  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
543  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
544  if (chunk_stats.max.tinyintval == 1) {
545  return true;
546  }
547  }
548  return false;
549 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:197
void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insertDataStruct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 394 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

394  {
395  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
396  try {
397  mapd_unique_lock<mapd_shared_mutex> insertLock(
398  insertMutex_); // prevent two threads from trying to insert into the same table
399  // simultaneously
400 
401  insertDataImpl(insertDataStruct);
402 
403  if (defaultInsertLevel_ ==
404  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
406  chunkKeyPrefix_[0],
407  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
408  }
409  } catch (...) {
410  auto table_epochs =
411  catalog_->getTableEpochs(insertDataStruct.databaseId, insertDataStruct.tableId);
412 
413  // the statement below deletes *this* object!
414  // relying on exception propagation at this stage
415  // until we can sort this out in a cleaner fashion
416  catalog_->setTableEpochs(insertDataStruct.databaseId, table_epochs);
417  throw;
418  }
419 }
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs)
Definition: Catalog.cpp:2973
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:489
void insertDataImpl(InsertData &insertDataStruct)
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2945

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insertDataStruct)
protected

Definition at line 551 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::replicate_count.

551  {
552  // populate deleted system column if it should exists, as it will not come from client
553  // Do not add this magical column in the replicate ALTER TABLE ADD route as
554  // it is not needed and will cause issues
555  std::unique_ptr<int8_t[]> data_for_deleted_column;
556  for (const auto& cit : columnMap_) {
557  if (cit.second.getColumnDesc()->isDeletedCol &&
558  insertDataStruct.replicate_count == 0) {
559  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
560  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
561  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
562  insertDataStruct.columnIds.push_back(cit.second.getColumnDesc()->columnId);
563  break;
564  }
565  }
566  // MAT we need to add a removal of the empty column we pushed onto here
567  // for upstream safety. Should not be a problem but need to fix.
568 
569  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
570  for (const auto columnId : insertDataStruct.columnIds) {
571  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
572  CHECK(columnDesc);
573  if (columnMap_.end() == columnMap_.find(columnId)) {
574  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
575  }
576  }
577 
578  // when replicate (add) column(s), this path seems wont work; go separate route...
579  if (insertDataStruct.replicate_count > 0) {
580  replicateData(insertDataStruct);
581  return;
582  }
583 
584  std::unordered_map<int, int> inverseInsertDataColIdMap;
585  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
586  inverseInsertDataColIdMap.insert(
587  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
588  }
589 
590  size_t numRowsLeft = insertDataStruct.numRows;
591  size_t numRowsInserted = 0;
592  vector<DataBlockPtr> dataCopy =
593  insertDataStruct.data; // bc append data will move ptr forward and this violates
594  // constness of InsertData
595  if (numRowsLeft <= 0) {
596  return;
597  }
598 
599  FragmentInfo* currentFragment{nullptr};
600 
601  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
602  currentFragment = createNewFragment(defaultInsertLevel_);
603  } else {
604  currentFragment = fragmentInfoVec_.back().get();
605  }
606  CHECK(currentFragment);
607 
608  size_t startFragment = fragmentInfoVec_.size() - 1;
609 
610  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
611  // loop until done inserting all rows
612  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
613  size_t rowsLeftInCurrentFragment =
614  maxFragmentRows_ - currentFragment->shadowNumTuples;
615  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
616  if (rowsLeftInCurrentFragment != 0) {
617  for (auto& varLenColInfoIt : varLenColInfo_) {
618  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
619  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
620  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
621  if (insertIdIt != inverseInsertDataColIdMap.end()) {
622  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
623  numRowsToInsert = std::min(
624  numRowsToInsert,
625  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
626  numRowsToInsert,
627  numRowsInserted,
628  bytesLeft));
629  }
630  }
631  }
632 
633  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
634  currentFragment = createNewFragment(defaultInsertLevel_);
635  if (numRowsInserted == 0) {
636  startFragment++;
637  }
638  rowsLeftInCurrentFragment = maxFragmentRows_;
639  for (auto& varLenColInfoIt : varLenColInfo_) {
640  varLenColInfoIt.second = 0; // reset byte counter
641  }
642  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
643  for (auto& varLenColInfoIt : varLenColInfo_) {
644  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
645  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
646  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
647  if (insertIdIt != inverseInsertDataColIdMap.end()) {
648  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
649  numRowsToInsert = std::min(
650  numRowsToInsert,
651  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
652  numRowsToInsert,
653  numRowsInserted,
654  bytesLeft));
655  }
656  }
657  }
658 
659  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
660  // never be able to insert anything
661 
662  {
663  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
664 
665  // for each column, append the data in the appropriate insert buffer
666  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
667  int columnId = insertDataStruct.columnIds[i];
668  auto colMapIt = columnMap_.find(columnId);
669  CHECK(colMapIt != columnMap_.end());
670  currentFragment->shadowChunkMetadataMap[columnId] =
671  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
672  auto varLenColInfoIt = varLenColInfo_.find(columnId);
673  if (varLenColInfoIt != varLenColInfo_.end()) {
674  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
675  }
676  }
677  if (hasMaterializedRowId_) {
678  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
679  currentFragment->shadowNumTuples;
680  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
681  for (size_t i = 0; i < numRowsToInsert; ++i) {
682  row_id_data[i] = i + startId;
683  }
684  DataBlockPtr rowIdBlock;
685  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
686  auto colMapIt = columnMap_.find(rowIdColId_);
687  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
688  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
689  }
690 
691  currentFragment->shadowNumTuples =
692  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
693  numRowsLeft -= numRowsToInsert;
694  numRowsInserted += numRowsToInsert;
695  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
696  partIt != fragmentInfoVec_.end();
697  ++partIt) {
698  auto fragment_ptr = partIt->get();
699  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
700  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
701  }
702  }
703  }
704  numTuples_ += insertDataStruct.numRows;
706 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void dropFragmentsToSize(const size_t maxRows) override
Will truncate table to less than maxRows by dropping fragments.
#define CHECK_GT(x, y)
Definition: Logger.h:209
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:197
void replicateData(const InsertData &insertDataStruct)
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:217

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insertDataStruct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 421 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

421  {
422  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
423  mapd_unique_lock<mapd_shared_mutex> insertLock(
424  insertMutex_); // prevent two threads from trying to insert into the same table
425  // simultaneously
426  insertDataImpl(insertDataStruct);
427 }
void insertDataImpl(InsertData &insertDataStruct)

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::replicateData ( const InsertData insertDataStruct)
protected

Definition at line 429 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::bypass, catalog_(), CHECK, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

429  {
430  // synchronize concurrent accesses to fragmentInfoVec_
431  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
432  size_t numRowsLeft = insertDataStruct.numRows;
433  for (auto const& fragmentInfo : fragmentInfoVec_) {
434  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
435  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
436  size_t numRowsCanBeInserted;
437  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
438  if (insertDataStruct.bypass[i]) {
439  continue;
440  }
441  auto columnId = insertDataStruct.columnIds[i];
442  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
443  CHECK(colDesc);
444  CHECK(columnMap_.find(columnId) != columnMap_.end());
445 
446  ChunkKey chunkKey = chunkKeyPrefix_;
447  chunkKey.push_back(columnId);
448  chunkKey.push_back(fragmentInfo->fragmentId);
449 
450  auto colMapIt = columnMap_.find(columnId);
451  auto& chunk = colMapIt->second;
452  if (chunk.isChunkOnDevice(
453  dataMgr_,
454  chunkKey,
456  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
457  dataMgr_->deleteChunksWithPrefix(chunkKey);
458  }
459  chunk.createChunkBuffer(
460  dataMgr_,
461  chunkKey,
463  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
464  chunk.initEncoder();
465 
466  try {
467  DataBlockPtr dataCopy = insertDataStruct.data[i];
468  auto size = colDesc->columnType.get_size();
469  if (0 > size) {
470  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
471  varLenColInfo_[columnId] = 0;
472  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
473  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
474  } else {
475  numRowsCanBeInserted = maxChunkSize_ / size;
476  }
477 
478  // FIXME: abort a case in which new column is wider than existing columns
479  if (numRowsCanBeInserted < numRowsToInsert) {
480  throw std::runtime_error("new column '" + colDesc->columnName +
481  "' wider than existing columns is not supported");
482  }
483 
484  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
485  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
486 
487  // update total size of var-len column in (actually the last) fragment
488  if (0 > size) {
489  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
490  varLenColInfo_[columnId] = chunk.getBuffer()->size();
491  }
492  } catch (...) {
493  dataMgr_->deleteChunksWithPrefix(chunkKey);
494  throw;
495  }
496  }
497  numRowsLeft -= numRowsToInsert;
498  }
499  CHECK(0 == numRowsLeft);
500 
501  for (auto const& fragmentInfo : fragmentInfoVec_) {
502  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
503  }
504 }
std::vector< int > ChunkKey
Definition: types.h:37
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:436
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:197
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 294 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, show_chunk(), VLOG, and logger::WARNING.

296  {
297  // synchronize concurrent accesses to fragmentInfoVec_
298  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
304  if (shard_ >= 0) {
305  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
306  }
307 
308  CHECK(cd);
309  const auto column_id = cd->columnId;
310  const auto col_itr = columnMap_.find(column_id);
311  CHECK(col_itr != columnMap_.end());
312 
313  for (auto const& fragment : fragmentInfoVec_) {
314  auto stats_itr = stats_map.find(fragment->fragmentId);
315  if (stats_itr != stats_map.end()) {
316  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
317  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
318  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
320  column_id,
321  fragment->fragmentId};
322  auto chunk = Chunk_NS::Chunk::getChunk(cd,
323  &catalog_->getDataMgr(),
324  chunk_key,
326  0,
327  chunk_meta_it->second->numBytes,
328  chunk_meta_it->second->numElements);
329  auto buf = chunk->getBuffer();
330  CHECK(buf);
331  if (!buf->hasEncoder()) {
332  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
333  }
334  auto encoder = buf->getEncoder();
335 
336  auto chunk_stats = stats_itr->second;
337 
338  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
339  encoder->getMetadata(old_chunk_metadata);
340  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
341 
342  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
343  // Use the logical type to display data, since the encoding should be ignored
344  const auto logical_ti = cd->columnType.is_dict_encoded_string()
346  : get_logical_type_info(cd->columnType);
347  if (!didResetStats) {
348  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
349  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
350  << DatumToString(chunk_stats.max, logical_ti);
351  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
352  << DatumToString(chunk_stats.min, logical_ti);
353  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
354  continue; // move to next fragment
355  }
356 
357  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
358  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
359  << DatumToString(chunk_stats.max, logical_ti);
360  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
361  << DatumToString(chunk_stats.min, logical_ti);
362  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
363 
364  // Reset fragment metadata map and set buffer to dirty
365  auto new_metadata = std::make_shared<ChunkMetadata>();
366  // Run through fillChunkStats to ensure any transformations to the raw metadata
367  // values get applied (e.g. for date in days)
368  encoder->getMetadata(new_metadata);
369 
370  fragment->setChunkMetadata(column_id, new_metadata);
371  fragment->shadowChunkMetadataMap =
372  fragment->getChunkMetadataMap(); // TODO(adb): needed?
373  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
374  buf->setDirty();
375  }
376  } else {
377  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
378  << ", table " << physicalTableId_ << ", "
379  << ", column " << column_id;
380  }
381  }
382 }
std::vector< int > ChunkKey
Definition: types.h:37
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:240
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
#define LOG(tag)
Definition: Logger.h:188
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:893
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
#define CHECK(condition)
Definition: Logger.h:197
bool is_dict_encoded_string() const
Definition: sqltypes.h:512
SQLTypeInfo columnType
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 571 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, UpdelRoll::mutex, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, ColumnDescriptor::tableId, TableDescriptor::tableId, anonymous_namespace{TypedDataAccessors.h}::tabulate_metadata(), temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, updateColumnMetadata(), NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

579  {
580  updel_roll.catalog = catalog;
581  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
582  updel_roll.memoryLevel = memory_level;
583 
584  const size_t ncore = cpu_threads();
585  const auto nrow = frag_offsets.size();
586  const auto n_rhs_values = rhs_values.size();
587  if (0 == nrow) {
588  return;
589  }
590  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
591 
592  auto fragment_ptr = getFragmentInfo(fragment_id);
593  auto& fragment = *fragment_ptr;
594  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
595  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
596  ChunkKey chunk_key{
597  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
598  auto chunk = Chunk_NS::Chunk::getChunk(cd,
599  &catalog->getDataMgr(),
600  chunk_key,
602  0,
603  chunk_meta_it->second->numBytes,
604  chunk_meta_it->second->numElements);
605 
606  std::vector<int8_t> has_null_per_thread(ncore, 0);
607  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
608  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
609  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
610  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
611 
612  // parallel update elements
613  std::vector<std::future<void>> threads;
614 
615  const auto segsz = (nrow + ncore - 1) / ncore;
616  auto dbuf = chunk->getBuffer();
617  auto dbuf_addr = dbuf->getMemoryPtr();
618  dbuf->setUpdated();
619  {
620  std::lock_guard<std::mutex> lck(updel_roll.mutex);
621  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
622  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
623  }
624 
625  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
626  cd->tableId,
627  cd->columnId,
628  fragment.fragmentId};
629  updel_roll.dirtyChunkeys.insert(chunkey);
630  }
631  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
632  threads.emplace_back(std::async(
633  std::launch::async,
634  [=,
635  &has_null_per_thread,
636  &min_int64t_per_thread,
637  &max_int64t_per_thread,
638  &min_double_per_thread,
639  &max_double_per_thread,
640  &frag_offsets,
641  &rhs_values] {
642  SQLTypeInfo lhs_type = cd->columnType;
643 
644  // !! not sure if this is a undocumented convention or a bug, but for a sharded
645  // table the dictionary id of a encoded string column is not specified by
646  // comp_param in physical table but somehow in logical table :) comp_param in
647  // physical table is always 0, so need to adapt accordingly...
648  auto cdl = (shard_ < 0)
649  ? cd
650  : catalog->getMetadataForColumn(
651  catalog->getLogicalTableId(td->tableId), cd->columnId);
652  CHECK(cdl);
653  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
654  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
655  lhs_type, &decimalOverflowValidator);
656  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
657  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
658  lhs_type, &dateDaysOverflowValidator);
659 
660  StringDictionary* stringDict{nullptr};
661  if (lhs_type.is_string()) {
662  CHECK(kENCODING_DICT == lhs_type.get_compression());
663  auto dictDesc = const_cast<DictDescriptor*>(
664  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
665  CHECK(dictDesc);
666  stringDict = dictDesc->stringDict.get();
667  CHECK(stringDict);
668  }
669 
670  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
671  const auto roffs = frag_offsets[r];
672  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
673  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
674  ScalarTargetValue sv2;
675 
676  // Subtle here is on the two cases of string-to-string assignments, when
677  // upstream passes RHS string as a string index instead of a preferred "real
678  // string".
679  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
680  // index
681  // in this layer, so if upstream passes a str idx here, an
682  // exception is thrown.
683  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
684  // str idx.
685  if (rhs_type.is_string()) {
686  if (const auto vp = boost::get<int64_t>(sv)) {
687  auto dictDesc = const_cast<DictDescriptor*>(
688  catalog->getMetadataForDict(rhs_type.get_comp_param()));
689  if (nullptr == dictDesc) {
690  throw std::runtime_error(
691  "UPDATE does not support cast from string literal to string "
692  "column.");
693  }
694  auto stringDict = dictDesc->stringDict.get();
695  CHECK(stringDict);
696  sv2 = NullableString(stringDict->getString(*vp));
697  sv = &sv2;
698  }
699  }
700 
701  if (const auto vp = boost::get<int64_t>(sv)) {
702  auto v = *vp;
703  if (lhs_type.is_string()) {
704  throw std::runtime_error("UPDATE does not support cast to string.");
705  }
706  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
707  if (lhs_type.is_decimal()) {
708  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
709  int64_t decimal_val;
710  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
711  tabulate_metadata(lhs_type,
712  min_int64t_per_thread[c],
713  max_int64t_per_thread[c],
714  has_null_per_thread[c],
715  (v == inline_int_null_value<int64_t>() &&
716  lhs_type.get_notnull() == false)
717  ? v
718  : decimal_val);
719  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
720  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
721  if (positive_v_and_negative_d || negative_v_and_positive_d) {
722  throw std::runtime_error(
723  "Data conversion overflow on " + std::to_string(v) +
724  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
725  std::to_string(rhs_type.get_scale()) + ") to (" +
726  std::to_string(lhs_type.get_dimension()) + ", " +
727  std::to_string(lhs_type.get_scale()) + ")");
728  }
729  } else if (is_integral(lhs_type)) {
730  if (lhs_type.is_date_in_days()) {
731  // Store meta values in seconds
732  if (lhs_type.get_size() == 2) {
733  nullAwareDateOverflowValidator.validate<int16_t>(v);
734  } else {
735  nullAwareDateOverflowValidator.validate<int32_t>(v);
736  }
737  int64_t days;
738  get_scalar<int64_t>(data_ptr, lhs_type, days);
739  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
740  tabulate_metadata(lhs_type,
741  min_int64t_per_thread[c],
742  max_int64t_per_thread[c],
743  has_null_per_thread[c],
744  (v == inline_int_null_value<int64_t>() &&
745  lhs_type.get_notnull() == false)
746  ? NullSentinelSupplier()(lhs_type, v)
747  : seconds);
748  } else {
749  int64_t target_value;
750  if (rhs_type.is_decimal()) {
751  target_value = round(decimal_to_double(rhs_type, v));
752  } else {
753  target_value = v;
754  }
755  tabulate_metadata(lhs_type,
756  min_int64t_per_thread[c],
757  max_int64t_per_thread[c],
758  has_null_per_thread[c],
759  target_value);
760  }
761  } else {
763  lhs_type,
764  min_double_per_thread[c],
765  max_double_per_thread[c],
766  has_null_per_thread[c],
767  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
768  }
769  } else if (const auto vp = boost::get<double>(sv)) {
770  auto v = *vp;
771  if (lhs_type.is_string()) {
772  throw std::runtime_error("UPDATE does not support cast to string.");
773  }
774  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
775  if (lhs_type.is_integer()) {
776  tabulate_metadata(lhs_type,
777  min_int64t_per_thread[c],
778  max_int64t_per_thread[c],
779  has_null_per_thread[c],
780  int64_t(v));
781  } else if (lhs_type.is_fp()) {
782  tabulate_metadata(lhs_type,
783  min_double_per_thread[c],
784  max_double_per_thread[c],
785  has_null_per_thread[c],
786  double(v));
787  } else {
788  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
789  "LHS with a floating RHS.";
790  }
791  } else if (const auto vp = boost::get<float>(sv)) {
792  auto v = *vp;
793  if (lhs_type.is_string()) {
794  throw std::runtime_error("UPDATE does not support cast to string.");
795  }
796  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
797  if (lhs_type.is_integer()) {
798  tabulate_metadata(lhs_type,
799  min_int64t_per_thread[c],
800  max_int64t_per_thread[c],
801  has_null_per_thread[c],
802  int64_t(v));
803  } else {
804  tabulate_metadata(lhs_type,
805  min_double_per_thread[c],
806  max_double_per_thread[c],
807  has_null_per_thread[c],
808  double(v));
809  }
810  } else if (const auto vp = boost::get<NullableString>(sv)) {
811  const auto s = boost::get<std::string>(vp);
812  const auto sval = s ? *s : std::string("");
813  if (lhs_type.is_string()) {
814  decltype(stringDict->getOrAdd(sval)) sidx;
815  {
816  std::unique_lock<std::mutex> lock(temp_mutex_);
817  sidx = stringDict->getOrAdd(sval);
818  }
819  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
820  tabulate_metadata(lhs_type,
821  min_int64t_per_thread[c],
822  max_int64t_per_thread[c],
823  has_null_per_thread[c],
824  int64_t(sidx));
825  } else if (sval.size() > 0) {
826  auto dval = std::atof(sval.data());
827  if (lhs_type.is_boolean()) {
828  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
829  } else if (lhs_type.is_time()) {
830  throw std::runtime_error(
831  "Date/Time/Timestamp update not supported through translated "
832  "string path.");
833  }
834  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
835  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
836  tabulate_metadata(lhs_type,
837  min_double_per_thread[c],
838  max_double_per_thread[c],
839  has_null_per_thread[c],
840  double(dval));
841  } else {
842  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
843  tabulate_metadata(lhs_type,
844  min_int64t_per_thread[c],
845  max_int64t_per_thread[c],
846  has_null_per_thread[c],
847  int64_t(dval));
848  }
849  } else {
850  put_null(data_ptr, lhs_type, cd->columnName);
851  has_null_per_thread[c] = true;
852  }
853  } else {
854  CHECK(false);
855  }
856  }
857  }));
858  if (threads.size() >= (size_t)cpu_threads()) {
859  wait_cleanup_threads(threads);
860  }
861  }
862  wait_cleanup_threads(threads);
863 
864  // for unit test
866  if (cd->isDeletedCol) {
867  const auto deleted_offsets = getVacuumOffsets(chunk);
868  if (deleted_offsets.size() > 0) {
869  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
870  return;
871  }
872  }
873  }
874  bool has_null_per_chunk{false};
875  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
876  double min_double_per_chunk{std::numeric_limits<double>::max()};
877  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
878  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
879  for (size_t c = 0; c < ncore; ++c) {
880  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
881  max_double_per_chunk =
882  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
883  min_double_per_chunk =
884  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
885  max_int64t_per_chunk =
886  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
887  min_int64t_per_chunk =
888  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
889  }
891  fragment,
892  chunk,
893  has_null_per_chunk,
894  max_double_per_chunk,
895  min_double_per_chunk,
896  max_int64t_per_chunk,
897  min_int64t_per_chunk,
898  cd->columnType,
899  updel_roll);
900 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
std::vector< int > ChunkKey
Definition: types.h:37
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:222
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
HOST DEVICE int get_scale() const
Definition: sqltypes.h:316
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:241
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3909
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1439
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:319
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:313
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:320
#define CHECK(condition)
Definition: Logger.h:197
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:478
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:481
std::string columnName
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 54 of file UpdelStorage.cpp.

References updateColumn().

62  {
63  updateColumn(catalog,
64  td,
65  cd,
66  fragment_id,
67  frag_offsets,
68  std::vector<ScalarTargetValue>(1, rhs_value),
69  rhs_type,
70  memory_level,
71  updel_roll);
72 }
void updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnChunkMetadata ( const ColumnDescriptor cd,
const int  fragment_id,
const std::shared_ptr< ChunkMetadata metadata 
)
overridevirtual

Updates the metadata for a column chunk.

Parameters
cd- ColumnDescriptor for the column
fragment_id- Fragment id of the chunk within the column
metadata- shared_ptr of the metadata to update column chunk with

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 281 of file InsertOrderFragmenter.cpp.

References CHECK, and ColumnDescriptor::columnId.

284  {
285  // synchronize concurrent accesses to fragmentInfoVec_
286  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
287 
288  CHECK(metadata.get());
289  auto fragment_info = getFragmentInfo(fragment_id);
290  CHECK(fragment_info);
291  fragment_info->setChunkMetadata(cd->columnId, metadata);
292 }
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
#define CHECK(condition)
Definition: Logger.h:197
void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const bool  null,
const double  dmax,
const double  dmin,
const int64_t  lmax,
const int64_t  lmin,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 902 of file UpdelStorage.cpp.

References UpdelRoll::catalog, UpdelRoll::chunkMetadata, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getMetadataForTable(), SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, UpdelRoll::mutex, UpdelRoll::numTuples, Fragmenter_Namespace::FragmentInfo::shadowNumTuples, ColumnDescriptor::tableId, and foreign_storage::update_stats().

Referenced by compactRows(), and updateColumn().

911  {
912  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
913  auto key = std::make_pair(td, &fragment);
914  std::lock_guard<std::mutex> lck(updel_roll.mutex);
915  if (0 == updel_roll.chunkMetadata.count(key)) {
916  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
917  }
918  if (0 == updel_roll.numTuples.count(key)) {
919  updel_roll.numTuples[key] = fragment.shadowNumTuples;
920  }
921  auto& chunkMetadata = updel_roll.chunkMetadata[key];
922 
923  auto buffer = chunk->getBuffer();
924  const auto& lhs_type = cd->columnType;
925 
926  auto encoder = buffer->getEncoder();
927  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
928  static_assert(std::is_same<decltype(min), decltype(max)>::value,
929  "Type mismatch on min/max");
930  if (has_null) {
931  encoder->updateStats(decltype(min)(), true);
932  }
933  if (max < min) {
934  return;
935  }
936  encoder->updateStats(min, false);
937  encoder->updateStats(max, false);
938  };
939 
940  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
941  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
942  } else if (lhs_type.is_fp()) {
943  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
944  } else if (lhs_type.is_decimal()) {
945  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
946  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
947  has_null_per_chunk);
948  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
949  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
950  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
951  }
952  buffer->getEncoder()->getMetadata(chunkMetadata[cd->columnId]);
953 }
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:481
std::mutex mutex
Definition: UpdelRoll.h:49
bool is_integral(const SQLTypeInfo &t)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll,
Executor executor 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 267 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK, checked_get(), cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, g_enable_experimental_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

277  {
278  updelRoll.is_varlen_update = true;
279  updelRoll.catalog = catalog;
280  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
281  updelRoll.memoryLevel = memoryLevel;
282 
283  size_t num_entries = sourceDataProvider.getEntryCount();
284  size_t num_rows = sourceDataProvider.getRowCount();
285 
286  if (0 == num_rows) {
287  // bail out early
288  return;
289  }
290 
292 
293  auto fragment_ptr = getFragmentInfo(fragmentId);
294  auto& fragment = *fragment_ptr;
295  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
296  get_chunks(catalog, td, fragment, memoryLevel, chunks);
297  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
298  columnDescriptors.size());
299  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
300 
301  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
302  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
303  auto chunk = chunks[indexOfChunk];
304  const auto chunk_cd = chunk->getColumnDesc();
305 
306  if (chunk_cd->isDeletedCol) {
307  deletedChunk = chunk;
308  continue;
309  }
310 
311  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
312  columnDescriptors.end(),
313  [=](const ColumnDescriptor* cd) -> bool {
314  return cd->columnId == chunk_cd->columnId;
315  });
316 
317  if (targetColumnIt != columnDescriptors.end()) {
318  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
319 
320  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
321  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
322 
324  num_rows,
325  *catalog,
326  sourceDataMetaInfo,
327  targetDescriptor,
328  targetDescriptor->columnType,
329  !targetDescriptor->columnType.get_notnull(),
330  sourceDataProvider.getLiteralDictionary(),
332  ? executor->getStringDictionaryProxy(
333  sourceDataMetaInfo.get_type_info().get_comp_param(),
334  executor->getRowSetMemoryOwner(),
335  true)
336  : nullptr};
337  auto converter = factory.create(param);
338  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
339 
340  if (targetDescriptor->columnType.is_geometry()) {
341  // geometry columns are composites
342  // need to skip chunks, depending on geo type
343  switch (targetDescriptor->columnType.get_type()) {
344  case kMULTIPOLYGON:
345  indexOfChunk += 5;
346  break;
347  case kPOLYGON:
348  indexOfChunk += 4;
349  break;
350  case kLINESTRING:
351  indexOfChunk += 2;
352  break;
353  case kPOINT:
354  indexOfChunk += 1;
355  break;
356  default:
357  CHECK(false); // not supported
358  }
359  }
360  } else {
361  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
362  std::unique_ptr<ChunkToInsertDataConverter> converter;
363 
364  if (chunk_cd->columnType.is_fixlen_array()) {
365  converter =
366  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
367  } else if (chunk_cd->columnType.is_string()) {
368  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
369  } else if (chunk_cd->columnType.is_geometry()) {
370  // the logical geo column is a string column
371  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
372  } else {
373  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
374  }
375 
376  chunkConverters.push_back(std::move(converter));
377 
378  } else if (chunk_cd->columnType.is_date_in_days()) {
379  /* Q: Why do we need this?
380  A: In variable length updates path we move the chunk content of column
381  without decoding. Since it again passes through DateDaysEncoder
382  the expected value should be in seconds, but here it will be in days.
383  Therefore, using DateChunkConverter chunk values are being scaled to
384  seconds which then ultimately encoded in days in DateDaysEncoder.
385  */
386  std::unique_ptr<ChunkToInsertDataConverter> converter;
387  const size_t physical_size = chunk_cd->columnType.get_size();
388  if (physical_size == 2) {
389  converter =
390  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
391  } else if (physical_size == 4) {
392  converter =
393  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
394  } else {
395  CHECK(false);
396  }
397  chunkConverters.push_back(std::move(converter));
398  } else {
399  std::unique_ptr<ChunkToInsertDataConverter> converter;
400  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
401  int logical_size = logical_type.get_size();
402  int physical_size = chunk_cd->columnType.get_size();
403 
404  if (logical_type.is_string()) {
405  // for dicts -> logical = physical
406  logical_size = physical_size;
407  }
408 
409  if (8 == physical_size) {
410  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
411  num_rows, chunk.get());
412  } else if (4 == physical_size) {
413  if (8 == logical_size) {
414  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
415  num_rows, chunk.get());
416  } else {
417  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
418  num_rows, chunk.get());
419  }
420  } else if (2 == chunk_cd->columnType.get_size()) {
421  if (8 == logical_size) {
422  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
423  num_rows, chunk.get());
424  } else if (4 == logical_size) {
425  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
426  num_rows, chunk.get());
427  } else {
428  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
429  num_rows, chunk.get());
430  }
431  } else if (1 == chunk_cd->columnType.get_size()) {
432  if (8 == logical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
434  num_rows, chunk.get());
435  } else if (4 == logical_size) {
436  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
437  num_rows, chunk.get());
438  } else if (2 == logical_size) {
439  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
440  num_rows, chunk.get());
441  } else {
442  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
443  num_rows, chunk.get());
444  }
445  } else {
446  CHECK(false); // unknown
447  }
448 
449  chunkConverters.push_back(std::move(converter));
450  }
451  }
452  }
453 
454  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
455  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
456 
457  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
458  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
459  deletedChunk->getColumnDesc()->tableId,
460  deletedChunk->getColumnDesc()->columnId,
461  fragment.fragmentId};
462  updelRoll.dirtyChunkeys.insert(chunkey);
463  bool* deletedChunkBuffer =
464  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
465 
466  std::atomic<size_t> row_idx{0};
467 
468  auto row_converter = [&sourceDataProvider,
469  &sourceDataConverters,
470  &indexOffFragmentOffsetColumn,
471  &chunkConverters,
472  &deletedChunkBuffer,
473  &row_idx](size_t indexOfEntry) -> void {
474  // convert the source data
475  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
476  if (row.empty()) {
477  return;
478  }
479 
480  size_t indexOfRow = row_idx.fetch_add(1);
481 
482  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
483  if (sourceDataConverters[col]) {
484  const auto& mapd_variant = row[col];
485  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
486  }
487  }
488 
489  auto scalar = checked_get(
490  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
491  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
492 
493  // convert the remaining chunks
494  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
495  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
496  }
497 
498  // now mark the row as deleted
499  deletedChunkBuffer[indexInChunkBuffer] = true;
500  };
501 
502  bool can_go_parallel = num_rows > 20000;
503 
504  if (can_go_parallel) {
505  const size_t num_worker_threads = cpu_threads();
506  std::vector<std::future<void>> worker_threads;
507  for (size_t i = 0,
508  start_entry = 0,
509  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
510  i < num_worker_threads && start_entry < num_entries;
511  ++i, start_entry += stride) {
512  const auto end_entry = std::min(start_entry + stride, num_rows);
513  worker_threads.push_back(std::async(
514  std::launch::async,
515  [&row_converter](const size_t start, const size_t end) {
516  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
517  row_converter(indexOfRow);
518  }
519  },
520  start_entry,
521  end_entry));
522  }
523 
524  for (auto& child : worker_threads) {
525  child.wait();
526  }
527 
528  } else {
529  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
530  row_converter(entryIdx);
531  }
532  }
533 
535  insert_data.databaseId = catalog->getCurrentDB().dbId;
536  insert_data.tableId = td->tableId;
537 
538  for (size_t i = 0; i < chunkConverters.size(); i++) {
539  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
540  continue;
541  }
542 
543  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
544  if (sourceDataConverters[i]) {
545  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
546  }
547  continue;
548  }
549 
550  insert_data.numRows = num_rows;
551  insertDataNoCheckpoint(insert_data);
552 
553  // update metdata
554  if (!deletedChunk->getBuffer()->hasEncoder()) {
555  deletedChunk->initEncoder();
556  }
557  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
558 
559  auto& shadowDeletedChunkMeta =
560  fragment.shadowChunkMetadataMap[deletedChunk->getColumnDesc()->columnId];
561  if (shadowDeletedChunkMeta->numElements >
562  deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
563  // the append will have populated shadow meta data, otherwise use existing num
564  // elements
565  deletedChunk->getBuffer()->getEncoder()->setNumElems(
566  shadowDeletedChunkMeta->numElements);
567  }
568  deletedChunk->getBuffer()->setUpdated();
569 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
bool is_varlen_update
Definition: UpdelRoll.h:67
std::vector< int > ChunkKey
Definition: types.h:37
HOST DEVICE int get_size() const
Definition: sqltypes.h:321
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:893
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3909
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
bool g_enable_experimental_string_functions
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:64
#define CHECK(condition)
Definition: Logger.h:197
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool is_string() const
Definition: sqltypes.h:478
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:24
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 955 of file UpdelStorage.cpp.

References UpdelRoll::chunkMetadata, fragmentInfoMutex_, and UpdelRoll::numTuples.

957  {
958  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
959  if (updel_roll.chunkMetadata.count(key)) {
960  auto& fragmentInfo = *key.second;
961  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
962  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
963  fragmentInfo.setChunkMetadataMap(chunkMetadata);
964  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
965  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
966  }
967 }
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1067 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1070  {
1071  const auto cd = chunk->getColumnDesc();
1072  const auto& col_type = cd->columnType;
1073  auto data_buffer = chunk->getBuffer();
1074  auto data_addr = data_buffer->getMemoryPtr();
1075  auto element_size =
1076  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1077  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1078  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1079  size_t nbytes_fix_data_to_keep = 0;
1080  auto nrows_to_vacuum = frag_offsets.size();
1081  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1082  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1083  auto is_last_one = irow == nrows_to_vacuum;
1084  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1085  auto maddr_to_vacuum = data_addr;
1086  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1087  if (nrows_to_keep > 0) {
1088  auto nbytes_to_keep = nrows_to_keep * element_size;
1089  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1090  // move curr fixlen row block toward front
1091  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1092  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1093  nbytes_to_keep);
1094  }
1095  irow_of_blk_to_fill += nrows_to_keep;
1096  nbytes_fix_data_to_keep += nbytes_to_keep;
1097  }
1098  irow_of_blk_to_keep = irow_to_vacuum + 1;
1099  }
1100  return nbytes_fix_data_to_keep;
1101 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1103 of file UpdelStorage.cpp.

References Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1106  {
1107  auto data_buffer = chunk->getBuffer();
1108  auto index_buffer = chunk->getIndexBuf();
1109  auto data_addr = data_buffer->getMemoryPtr();
1110  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1111  auto index_array = (StringOffsetT*)indices_addr;
1112  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1113  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1114  size_t nbytes_fix_data_to_keep = 0;
1115  size_t nbytes_var_data_to_keep = 0;
1116  auto nrows_to_vacuum = frag_offsets.size();
1117  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1118  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1119  auto is_last_one = irow == nrows_to_vacuum;
1120  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1121  auto maddr_to_vacuum = data_addr;
1122  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1123  if (nrows_to_keep > 0) {
1124  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1125  auto nbytes_to_keep =
1126  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1127  index_array[irow_of_blk_to_keep];
1128  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1129  // move curr varlen row block toward front
1130  memmove(data_addr + ibyte_var_data_to_keep,
1131  data_addr + index_array[irow_of_blk_to_keep],
1132  nbytes_to_keep);
1133 
1134  const auto index_base = index_array[irow_of_blk_to_keep];
1135  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1136  auto& index = index_array[irow_of_blk_to_keep + i];
1137  index = ibyte_var_data_to_keep + (index - index_base);
1138  }
1139  }
1140  nbytes_var_data_to_keep += nbytes_to_keep;
1141  maddr_to_vacuum = indices_addr;
1142 
1143  constexpr static auto index_element_size = sizeof(StringOffsetT);
1144  nbytes_to_keep = nrows_to_keep * index_element_size;
1145  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1146  // move curr fixlen row block toward front
1147  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1148  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1149  nbytes_to_keep);
1150  }
1151  irow_of_blk_to_fill += nrows_to_keep;
1152  nbytes_fix_data_to_keep += nbytes_to_keep;
1153  }
1154  irow_of_blk_to_keep = irow_to_vacuum + 1;
1155  }
1156  return nbytes_var_data_to_keep;
1157 }
int32_t StringOffsetT
Definition: sqltypes.h:919

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 180 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 182 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 186 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

Referenced by updateMetadata().

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 184 of file InsertOrderFragmenter.h.

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 205 of file InsertOrderFragmenter.h.

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 195 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 194 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 190 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 196 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 208 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 193 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 191 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 206 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 189 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 232 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 207 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: