OmniSciDB  dfae7c3b14
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insertDataStruct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertDataNoCheckpoint (InsertData &insertDataStruct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateColumnChunkMetadata (const ColumnDescriptor *cd, const int fragment_id, const std::shared_ptr< ChunkMetadata > metadata) override
 Updates the metadata for a column chunk. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor *> columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insertDataStruct)
 
void replicateData (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 52 of file InsertOrderFragmenter.h.

Member Typedef Documentation

◆ ModifyTransactionTracker

Constructor & Destructor Documentation

◆ InsertOrderFragmenter() [1/2]

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)

◆ ~InsertOrderFragmenter()

Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 92 of file InsertOrderFragmenter.cpp.

92 {}

◆ InsertOrderFragmenter() [2/2]

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

◆ compactRows()

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1159 of file UpdelStorage.cpp.

References CHECK, cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), getChunksForAllColumns(), getFragmentInfo(), UpdelRoll::numTuples, Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1164  {
1165  auto fragment_ptr = getFragmentInfo(fragment_id);
1166  auto& fragment = *fragment_ptr;
1167  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1168  const auto ncol = chunks.size();
1169 
1170  std::vector<int8_t> has_null_per_thread(ncol, 0);
1171  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1172  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1173  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1174  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1175 
1176  // parallel delete columns
1177  std::vector<std::future<void>> threads;
1178  auto nrows_to_vacuum = frag_offsets.size();
1179  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1180  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1181 
1182  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1183  auto chunk = chunks[ci];
1184  const auto cd = chunk->getColumnDesc();
1185  const auto& col_type = cd->columnType;
1186  auto data_buffer = chunk->getBuffer();
1187  auto index_buffer = chunk->getIndexBuf();
1188  auto data_addr = data_buffer->getMemoryPtr();
1189  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1190  auto index_array = (StringOffsetT*)indices_addr;
1191  bool is_varlen = col_type.is_varlen_indeed();
1192 
1193  auto fixlen_vacuum = [=,
1194  &has_null_per_thread,
1195  &max_double_per_thread,
1196  &min_double_per_thread,
1197  &min_int64t_per_thread,
1198  &max_int64t_per_thread,
1199  &updel_roll,
1200  &frag_offsets,
1201  &fragment] {
1202  size_t nbytes_fix_data_to_keep;
1203  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1204 
1205  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1206  data_buffer->setSize(nbytes_fix_data_to_keep);
1207  data_buffer->setUpdated();
1208 
1209  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1210 
1211  auto daddr = data_addr;
1212  auto element_size =
1213  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1214  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1215  if (col_type.is_fixlen_array()) {
1216  auto encoder =
1217  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1218  CHECK(encoder);
1219  encoder->updateMetadata((int8_t*)daddr);
1220  } else if (col_type.is_fp()) {
1221  set_chunk_stats(col_type,
1222  data_addr,
1223  has_null_per_thread[ci],
1224  min_double_per_thread[ci],
1225  max_double_per_thread[ci]);
1226  } else {
1227  set_chunk_stats(col_type,
1228  data_addr,
1229  has_null_per_thread[ci],
1230  min_int64t_per_thread[ci],
1231  max_int64t_per_thread[ci]);
1232  }
1233  }
1234  };
1235 
1236  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1237  size_t nbytes_var_data_to_keep;
1238  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1239 
1240  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1241  data_buffer->setSize(nbytes_var_data_to_keep);
1242  data_buffer->setUpdated();
1243 
1244  index_array[nrows_to_keep] = data_buffer->size();
1245  index_buffer->setSize(sizeof(*index_array) *
1246  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1247  index_buffer->setUpdated();
1248 
1249  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1250  };
1251 
1252  if (is_varlen) {
1253  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1254  } else {
1255  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1256  }
1257  if (threads.size() >= (size_t)cpu_threads()) {
1258  wait_cleanup_threads(threads);
1259  }
1260  }
1261 
1262  wait_cleanup_threads(threads);
1263 
1264  auto key = std::make_pair(td, &fragment);
1265  updel_roll.numTuples[key] = nrows_to_keep;
1266  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1267  auto chunk = chunks[ci];
1268  auto cd = chunk->getColumnDesc();
1269  if (!cd->columnType.is_fixlen_array()) {
1271  fragment,
1272  chunk,
1273  has_null_per_thread[ci],
1274  max_double_per_thread[ci],
1275  min_double_per_thread[ci],
1276  max_int64t_per_thread[ci],
1277  min_int64t_per_thread[ci],
1278  cd->columnType,
1279  updel_roll);
1280  }
1281  }
1282 }
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:868
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
#define CHECK(condition)
Definition: Logger.h:197
int cpu_threads()
Definition: thread_count.h:24
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ createNewFragment()

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 694 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), and Fragmenter_Namespace::FragmentInfo::fragmentId.

695  {
696  // also sets the new fragment as the insertBuffer for each column
697 
698  maxFragmentId_++;
699  auto newFragmentInfo = std::make_unique<FragmentInfo>();
700  newFragmentInfo->fragmentId = maxFragmentId_;
701  newFragmentInfo->shadowNumTuples = 0;
702  newFragmentInfo->setPhysicalNumTuples(0);
703  for (const auto levelSize : dataMgr_->levelSizes_) {
704  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
705  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
706  }
707  newFragmentInfo->physicalTableId = physicalTableId_;
708  newFragmentInfo->shard = shard_;
709 
710  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
711  colMapIt != columnMap_.end();
712  ++colMapIt) {
713  ChunkKey chunkKey = chunkKeyPrefix_;
714  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
715  chunkKey.push_back(maxFragmentId_);
716  colMapIt->second.createChunkBuffer(
717  dataMgr_,
718  chunkKey,
719  memoryLevel,
720  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
721  pageSize_);
722  colMapIt->second.initEncoder();
723  }
724 
725  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
726  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
727  return fragmentInfoVec_.back().get();
728 }
std::vector< int > levelSizes_
Definition: DataMgr.h:212
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::vector< int > ChunkKey
Definition: types.h:37
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)
+ Here is the call graph for this function:

◆ deleteFragments()

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 230 of file InsertOrderFragmenter.cpp.

References lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

230  {
231  // Fix a verified loophole on sharded logical table which is locked using logical
232  // tableId while it's its physical tables that can come here when fragments overflow
233  // during COPY. Locks on a logical table and its physical tables never intersect, which
234  // means potential races. It'll be an overkill to resolve a logical table to physical
235  // tables in DBHandler, ParseNode or other higher layers where the logical table is
236  // locked with Table Read/Write locks; it's easier to lock the logical table of its
237  // physical tables. A downside of this approach may be loss of parallel execution of
238  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
239  // operation, the loss seems not a big deal.
240  auto chunkKeyPrefix = chunkKeyPrefix_;
241  if (shard_ >= 0) {
242  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
243  }
244 
245  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
246  // SELECT and COPY may enter a deadlock
247  const auto delete_lock =
249 
250  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
251 
252  for (const auto fragId : dropFragIds) {
253  for (const auto& col : columnMap_) {
254  int colId = col.first;
255  vector<int> fragPrefix = chunkKeyPrefix_;
256  fragPrefix.push_back(colId);
257  fragPrefix.push_back(fragId);
258  dataMgr_->deleteChunksWithPrefix(fragPrefix);
259  }
260  }
261 }
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3709
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:428
std::map< int, Chunk_NS::Chunk > columnMap_
+ Here is the call graph for this function:

◆ dropColumns()

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 489 of file InsertOrderFragmenter.cpp.

489  {
490  // prevent concurrent insert rows and drop column
491  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
492  // synchronize concurrent accesses to fragmentInfoVec_
493  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
494  for (auto const& fragmentInfo : fragmentInfoVec_) {
495  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
496  }
497 
498  for (const auto columnId : columnIds) {
499  auto cit = columnMap_.find(columnId);
500  if (columnMap_.end() != cit) {
501  columnMap_.erase(cit);
502  }
503 
504  vector<int> fragPrefix = chunkKeyPrefix_;
505  fragPrefix.push_back(columnId);
506  dataMgr_->deleteChunksWithPrefix(fragPrefix);
507 
508  for (const auto& fragmentInfo : fragmentInfoVec_) {
509  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
510  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
511  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
512  }
513  }
514  }
515  for (const auto& fragmentInfo : fragmentInfoVec_) {
516  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
517  }
518 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:428
std::map< int, Chunk_NS::Chunk > columnMap_

◆ dropFragmentsToSize()

void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 203 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

203  {
204  // not safe to call from outside insertData
205  // b/c depends on insertLock around numTuples_
206 
207  // don't ever drop the only fragment!
208  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
209  return;
210  }
211 
212  if (numTuples_ > maxRows) {
213  size_t preNumTuples = numTuples_;
214  vector<int> dropFragIds;
215  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
216  while (numTuples_ > targetRows) {
217  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
218  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
219  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
220  fragmentInfoVec_.pop_front();
221  CHECK_GE(numTuples_, numFragTuples);
222  numTuples_ -= numFragTuples;
223  }
224  deleteFragments(dropFragIds);
225  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
226  << " post: " << numTuples_ << " maxRows: " << maxRows;
227  }
228 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:188
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define CHECK_GT(x, y)
Definition: Logger.h:209
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR

◆ getChunkKeyPrefix()

std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 107 of file InsertOrderFragmenter.h.

◆ getChunkMetadata()

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 112 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GE, Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, and to_string().

112  {
113  if (uses_foreign_storage_ ||
115  // memory-resident tables won't have anything on disk
116  ChunkMetadataVector chunk_metadata;
118 
119  // data comes like this - database_id, table_id, column_id, fragment_id
120  // but lets sort by database_id, table_id, fragment_id, column_id
121 
122  int fragment_subkey_index = 3;
123  std::sort(chunk_metadata.begin(),
124  chunk_metadata.end(),
125  [&](const auto& pair1, const auto& pair2) {
126  return pair1.first[3] < pair2.first[3];
127  });
128 
129  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
130  ++chunk_itr) {
131  int cur_column_id = chunk_itr->first[2];
132  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
133 
134  if (fragmentInfoVec_.empty() ||
135  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
136  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
137  CHECK(new_fragment_info);
138  maxFragmentId_ = cur_fragment_id;
139  new_fragment_info->fragmentId = cur_fragment_id;
140  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
141  numTuples_ += new_fragment_info->getPhysicalNumTuples();
142  for (const auto level_size : dataMgr_->levelSizes_) {
143  new_fragment_info->deviceIds.push_back(
144  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
145  }
146  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
147  new_fragment_info->physicalTableId = physicalTableId_;
148  new_fragment_info->shard = shard_;
149  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
150  } else {
151  if (chunk_itr->second->numElements !=
152  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
153  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
154  std::to_string(physicalTableId_) + ", Column " +
155  std::to_string(cur_column_id) + ". Fragment Tuples: " +
157  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
158  ", Chunk Tuples: " +
159  std::to_string(chunk_itr->second->numElements);
160  }
161  }
162  CHECK(fragmentInfoVec_.back().get());
163  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
164  }
165  }
166 
167  size_t maxFixedColSize = 0;
168 
169  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
170  auto size = colIt->second.getColumnDesc()->columnType.get_size();
171  if (size == -1) { // variable length
172  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
173  size = 8; // b/c we use this for string and array indices - gross to have magic
174  // number here
175  }
176  CHECK_GE(size, 0);
177  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
178  }
179 
180  // this is maximum number of rows assuming everything is fixed length
181  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
182 
183  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
184  // Now need to get the insert buffers for each column - should be last
185  // fragment
186  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
187  // TODO: add accessor here for safe indexing
188  int deviceId =
189  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
190  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
191  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
192  insertKey.push_back(colIt->first); // column id
193  insertKey.push_back(lastFragmentId); // fragment id
194  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
195  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
196  if (varLenColInfoIt != varLenColInfo_.end()) {
197  varLenColInfoIt->second = colIt->second.getBuffer()->size();
198  }
199  }
200  }
201 }
std::vector< int > levelSizes_
Definition: DataMgr.h:212
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:188
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:403
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)
+ Here is the call graph for this function:

◆ getChunksForAllColumns()

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 969 of file UpdelStorage.cpp.

References catalog_, CHECK, Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

972  {
973  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
974  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
975  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
976  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
977  ++ncol;
978  if (!cd->isVirtualCol) {
979  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
980  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
981  ChunkKey chunk_key{
982  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
983  auto chunk = Chunk_NS::Chunk::getChunk(cd,
984  &catalog_->getDataMgr(),
985  chunk_key,
986  memory_level,
987  0,
988  chunk_meta_it->second->numBytes,
989  chunk_meta_it->second->numElements);
990  chunks.push_back(chunk);
991  }
992  }
993  }
994  return chunks;
995 }
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:209
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:208
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getFragmenterId()

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 106 of file InsertOrderFragmenter.h.

106 { return chunkKeyPrefix_.back(); }

◆ getFragmenterType()

std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 111 of file InsertOrderFragmenter.h.

◆ getFragmentInfo()

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 366 of file InsertOrderFragmenter.cpp.

References CHECK.

Referenced by compactRows(), updateColumn(), and updateColumns().

366  {
367  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
368  fragmentInfoVec_.end(),
369  [fragment_id](const auto& fragment) -> bool {
370  return fragment->fragmentId == fragment_id;
371  });
372  CHECK(fragment_it != fragmentInfoVec_.end());
373  return fragment_it->get();
374 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ getFragmentInfoFromId()

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected

◆ getFragmentsForQuery()

TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 730 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

730  {
731  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
732  TableInfo queryInfo;
733  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
734  // right now we don't test predicate, so just return (copy of) all fragments
735  bool fragmentsExist = false;
736  if (fragmentInfoVec_.empty()) {
737  // If we have no fragments add a dummy empty fragment to make the executor
738  // not have separate logic for 0-row tables
739  int maxFragmentId = 0;
740  FragmentInfo emptyFragmentInfo;
741  emptyFragmentInfo.fragmentId = maxFragmentId;
742  emptyFragmentInfo.shadowNumTuples = 0;
743  emptyFragmentInfo.setPhysicalNumTuples(0);
744  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
745  emptyFragmentInfo.physicalTableId = physicalTableId_;
746  emptyFragmentInfo.shard = shard_;
747  queryInfo.fragments.push_back(emptyFragmentInfo);
748  } else {
749  fragmentsExist = true;
750  std::for_each(
751  fragmentInfoVec_.begin(),
752  fragmentInfoVec_.end(),
753  [&queryInfo](const auto& fragment_owned_ptr) {
754  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
755  });
756  }
757  readLock.unlock();
758  queryInfo.setPhysicalNumTuples(0);
759  auto partIt = queryInfo.fragments.begin();
760  if (fragmentsExist) {
761  while (partIt != queryInfo.fragments.end()) {
762  if (partIt->getPhysicalNumTuples() == 0) {
763  // this means that a concurrent insert query inserted tuples into a new fragment
764  // but when the query came in we didn't have this fragment. To make sure we don't
765  // mess up the executor we delete this fragment from the metadatamap (fixes
766  // earlier bug found 2015-05-08)
767  partIt = queryInfo.fragments.erase(partIt);
768  } else {
769  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
770  partIt->getPhysicalNumTuples());
771  ++partIt;
772  }
773  }
774  } else {
775  // We added a dummy fragment and know the table is empty
776  queryInfo.setPhysicalNumTuples(0);
777  }
778  return queryInfo;
779 }
std::vector< int > levelSizes_
Definition: DataMgr.h:212
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
+ Here is the call graph for this function:

◆ getNumRows()

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual

◆ getVacuumOffsets()

const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 998 of file UpdelStorage.cpp.

References CHECK, cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

999  {
1000  const auto data_buffer = chunk->getBuffer();
1001  const auto data_addr = data_buffer->getMemoryPtr();
1002  const size_t nrows_in_chunk = data_buffer->size();
1003  const size_t ncore = cpu_threads();
1004  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1005  std::vector<std::vector<uint64_t>> deleted_offsets;
1006  deleted_offsets.resize(ncore);
1007  std::vector<std::future<void>> threads;
1008  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1009  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1010  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1011  const auto ithread = rbegin / segsz;
1012  CHECK(ithread < deleted_offsets.size());
1013  deleted_offsets[ithread].reserve(segsz);
1014  for (size_t r = rbegin; r < rend; ++r) {
1015  if (data_addr[r]) {
1016  deleted_offsets[ithread].push_back(r);
1017  }
1018  }
1019  }));
1020  }
1021  wait_cleanup_threads(threads);
1022  std::vector<uint64_t> all_deleted_offsets;
1023  for (size_t i = 0; i < ncore; ++i) {
1024  all_deleted_offsets.insert(
1025  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1026  }
1027  return all_deleted_offsets;
1028 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
#define CHECK(condition)
Definition: Logger.h:197
int cpu_threads()
Definition: thread_count.h:24
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ hasDeletedRows()

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 520 of file InsertOrderFragmenter.cpp.

References CHECK.

520  {
521  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
522 
523  for (auto const& fragment : fragmentInfoVec_) {
524  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
525  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
526  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
527  if (chunk_stats.max.tinyintval == 1) {
528  return true;
529  }
530  }
531  return false;
532 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:197

◆ insertData()

void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insertDataStruct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 376 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData(), and Fragmenter_Namespace::ChunkToInsertDataConverter::~ChunkToInsertDataConverter().

376  {
377  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
378  try {
379  mapd_unique_lock<mapd_shared_mutex> insertLock(
380  insertMutex_); // prevent two threads from trying to insert into the same table
381  // simultaneously
382 
383  insertDataImpl(insertDataStruct);
384 
385  if (defaultInsertLevel_ ==
386  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
388  chunkKeyPrefix_[0],
389  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
390  }
391  } catch (...) {
392  int32_t tableEpoch =
393  catalog_->getTableEpoch(insertDataStruct.databaseId, insertDataStruct.tableId);
394 
395  // the statement below deletes *this* object!
396  // relying on exception propagation at this stage
397  // until we can sort this out in a cleaner fashion
399  insertDataStruct.databaseId, insertDataStruct.tableId, tableEpoch);
400  throw;
401  }
402 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:481
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2715
void insertDataImpl(InsertData &insertDataStruct)
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2759
+ Here is the caller graph for this function:

◆ insertDataImpl()

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insertDataStruct)
protected

Definition at line 534 of file InsertOrderFragmenter.cpp.

References CHECK, CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::replicate_count.

534  {
535  // populate deleted system column if it should exists, as it will not come from client
536  // Do not add this magical column in the replicate ALTER TABLE ADD route as
537  // it is not needed and will cause issues
538  std::unique_ptr<int8_t[]> data_for_deleted_column;
539  for (const auto& cit : columnMap_) {
540  if (cit.second.getColumnDesc()->isDeletedCol &&
541  insertDataStruct.replicate_count == 0) {
542  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
543  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
544  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
545  insertDataStruct.columnIds.push_back(cit.second.getColumnDesc()->columnId);
546  break;
547  }
548  }
549  // MAT we need to add a removal of the empty column we pushed onto here
550  // for upstream safety. Should not be a problem but need to fix.
551 
552  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
553  for (const auto columnId : insertDataStruct.columnIds) {
554  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
555  CHECK(columnDesc);
556  if (columnMap_.end() == columnMap_.find(columnId)) {
557  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
558  }
559  }
560 
561  // when replicate (add) column(s), this path seems wont work; go separate route...
562  if (insertDataStruct.replicate_count > 0) {
563  replicateData(insertDataStruct);
564  return;
565  }
566 
567  std::unordered_map<int, int> inverseInsertDataColIdMap;
568  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
569  inverseInsertDataColIdMap.insert(
570  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
571  }
572 
573  size_t numRowsLeft = insertDataStruct.numRows;
574  size_t numRowsInserted = 0;
575  vector<DataBlockPtr> dataCopy =
576  insertDataStruct.data; // bc append data will move ptr forward and this violates
577  // constness of InsertData
578  if (numRowsLeft <= 0) {
579  return;
580  }
581 
582  FragmentInfo* currentFragment{nullptr};
583 
584  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
585  currentFragment = createNewFragment(defaultInsertLevel_);
586  } else {
587  currentFragment = fragmentInfoVec_.back().get();
588  }
589  CHECK(currentFragment);
590 
591  size_t startFragment = fragmentInfoVec_.size() - 1;
592 
593  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
594  // loop until done inserting all rows
595  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
596  size_t rowsLeftInCurrentFragment =
597  maxFragmentRows_ - currentFragment->shadowNumTuples;
598  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
599  if (rowsLeftInCurrentFragment != 0) {
600  for (auto& varLenColInfoIt : varLenColInfo_) {
601  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
602  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
603  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
604  if (insertIdIt != inverseInsertDataColIdMap.end()) {
605  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
606  numRowsToInsert = std::min(
607  numRowsToInsert,
608  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
609  numRowsToInsert,
610  numRowsInserted,
611  bytesLeft));
612  }
613  }
614  }
615 
616  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
617  currentFragment = createNewFragment(defaultInsertLevel_);
618  if (numRowsInserted == 0) {
619  startFragment++;
620  }
621  rowsLeftInCurrentFragment = maxFragmentRows_;
622  for (auto& varLenColInfoIt : varLenColInfo_) {
623  varLenColInfoIt.second = 0; // reset byte counter
624  }
625  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
626  for (auto& varLenColInfoIt : varLenColInfo_) {
627  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
628  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
629  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
630  if (insertIdIt != inverseInsertDataColIdMap.end()) {
631  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
632  numRowsToInsert = std::min(
633  numRowsToInsert,
634  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
635  numRowsToInsert,
636  numRowsInserted,
637  bytesLeft));
638  }
639  }
640  }
641 
642  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
643  // never be able to insert anything
644 
645  // for each column, append the data in the appropriate insert buffer
646  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
647  int columnId = insertDataStruct.columnIds[i];
648  auto colMapIt = columnMap_.find(columnId);
649  CHECK(colMapIt != columnMap_.end());
650  currentFragment->shadowChunkMetadataMap[columnId] =
651  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
652  auto varLenColInfoIt = varLenColInfo_.find(columnId);
653  if (varLenColInfoIt != varLenColInfo_.end()) {
654  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
655  }
656  }
657  if (hasMaterializedRowId_) {
658  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
659  currentFragment->shadowNumTuples;
660  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
661  for (size_t i = 0; i < numRowsToInsert; ++i) {
662  row_id_data[i] = i + startId;
663  }
664  DataBlockPtr rowIdBlock;
665  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
666  auto colMapIt = columnMap_.find(rowIdColId_);
667  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
668  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
669  }
670 
671  currentFragment->shadowNumTuples =
672  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
673  numRowsLeft -= numRowsToInsert;
674  numRowsInserted += numRowsToInsert;
675  }
676  {
677  // Only take the fragment info lock when updating fragment info map. Otherwise,
678  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
679  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
680  // COPY_FROM waits for TableWriteLock
681  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
682  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
683  partIt != fragmentInfoVec_.end();
684  ++partIt) {
685  auto fragment_ptr = partIt->get();
686  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
687  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
688  }
689  }
690  numTuples_ += insertDataStruct.numRows;
692 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void dropFragmentsToSize(const size_t maxRows) override
Will truncate table to less than maxRows by dropping fragments.
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK_GT(x, y)
Definition: Logger.h:209
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:197
void replicateData(const InsertData &insertDataStruct)
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:149

◆ insertDataNoCheckpoint()

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insertDataStruct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 404 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

404  {
405  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
406  mapd_unique_lock<mapd_shared_mutex> insertLock(
407  insertMutex_); // prevent two threads from trying to insert into the same table
408  // simultaneously
409  insertDataImpl(insertDataStruct);
410 }
void insertDataImpl(InsertData &insertDataStruct)
+ Here is the caller graph for this function:

◆ lockInsertCheckpointData()

void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected

◆ operator=()

InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected

◆ replicateData()

void Fragmenter_Namespace::InsertOrderFragmenter::replicateData ( const InsertData insertDataStruct)
protected

Definition at line 412 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::bypass, CHECK, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

412  {
413  // synchronize concurrent accesses to fragmentInfoVec_
414  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
415  size_t numRowsLeft = insertDataStruct.numRows;
416  for (auto const& fragmentInfo : fragmentInfoVec_) {
417  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
418  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
419  size_t numRowsCanBeInserted;
420  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
421  if (insertDataStruct.bypass[i]) {
422  continue;
423  }
424  auto columnId = insertDataStruct.columnIds[i];
425  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
426  CHECK(colDesc);
427  CHECK(columnMap_.find(columnId) != columnMap_.end());
428 
429  ChunkKey chunkKey = chunkKeyPrefix_;
430  chunkKey.push_back(columnId);
431  chunkKey.push_back(fragmentInfo->fragmentId);
432 
433  auto colMapIt = columnMap_.find(columnId);
434  auto& chunk = colMapIt->second;
435  if (chunk.isChunkOnDevice(
436  dataMgr_,
437  chunkKey,
439  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
440  dataMgr_->deleteChunksWithPrefix(chunkKey);
441  }
442  chunk.createChunkBuffer(
443  dataMgr_,
444  chunkKey,
446  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
447  chunk.initEncoder();
448 
449  try {
450  DataBlockPtr dataCopy = insertDataStruct.data[i];
451  auto size = colDesc->columnType.get_size();
452  if (0 > size) {
453  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
454  varLenColInfo_[columnId] = 0;
455  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
456  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
457  } else {
458  numRowsCanBeInserted = maxChunkSize_ / size;
459  }
460 
461  // FIXME: abort a case in which new column is wider than existing columns
462  if (numRowsCanBeInserted < numRowsToInsert) {
463  throw std::runtime_error("new column '" + colDesc->columnName +
464  "' wider than existing columns is not supported");
465  }
466 
467  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
468  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
469 
470  // update total size of var-len column in (actually the last) fragment
471  if (0 > size) {
472  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
473  varLenColInfo_[columnId] = chunk.getBuffer()->size();
474  }
475  } catch (...) {
476  dataMgr_->deleteChunksWithPrefix(chunkKey);
477  throw;
478  }
479  }
480  numRowsLeft -= numRowsToInsert;
481  }
482  CHECK(0 == numRowsLeft);
483 
484  for (auto const& fragmentInfo : fragmentInfoVec_) {
485  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
486  }
487 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:428
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
std::map< int, Chunk_NS::Chunk > columnMap_

◆ setNumRows()

void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual

◆ updateChunkStats()

void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 276 of file InsertOrderFragmenter.cpp.

References CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, show_chunk(), VLOG, and logger::WARNING.

278  {
279  // synchronize concurrent accesses to fragmentInfoVec_
280  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
286  if (shard_ >= 0) {
287  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
288  }
289 
290  CHECK(cd);
291  const auto column_id = cd->columnId;
292  const auto col_itr = columnMap_.find(column_id);
293  CHECK(col_itr != columnMap_.end());
294 
295  for (auto const& fragment : fragmentInfoVec_) {
296  auto stats_itr = stats_map.find(fragment->fragmentId);
297  if (stats_itr != stats_map.end()) {
298  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
299  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
300  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
302  column_id,
303  fragment->fragmentId};
304  auto chunk = Chunk_NS::Chunk::getChunk(cd,
305  &catalog_->getDataMgr(),
306  chunk_key,
308  0,
309  chunk_meta_it->second->numBytes,
310  chunk_meta_it->second->numElements);
311  auto buf = chunk->getBuffer();
312  CHECK(buf);
313  if (!buf->hasEncoder()) {
314  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
315  }
316  auto encoder = buf->getEncoder();
317 
318  auto chunk_stats = stats_itr->second;
319 
320  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
321  encoder->getMetadata(old_chunk_metadata);
322  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
323 
324  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
325  // Use the logical type to display data, since the encoding should be ignored
326  const auto logical_ti = cd->columnType.is_dict_encoded_string()
328  : get_logical_type_info(cd->columnType);
329  if (!didResetStats) {
330  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
331  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
332  << DatumToString(chunk_stats.max, logical_ti);
333  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
334  << DatumToString(chunk_stats.min, logical_ti);
335  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
336  continue; // move to next fragment
337  }
338 
339  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
340  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
341  << DatumToString(chunk_stats.max, logical_ti);
342  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
343  << DatumToString(chunk_stats.min, logical_ti);
344  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
345 
346  // Reset fragment metadata map and set buffer to dirty
347  auto new_metadata = std::make_shared<ChunkMetadata>();
348  // Run through fillChunkStats to ensure any transformations to the raw metadata
349  // values get applied (e.g. for date in days)
350  encoder->getMetadata(new_metadata);
351 
352  fragment->setChunkMetadata(column_id, new_metadata);
353  fragment->shadowChunkMetadataMap =
354  fragment->getChunkMetadataMap(); // TODO(adb): needed?
355  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
356  buf->setDirty();
357  }
358  } else {
359  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
360  << ", table " << physicalTableId_ << ", "
361  << ", column " << column_id;
362  }
363  }
364 }
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:239
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:209
#define LOG(tag)
Definition: Logger.h:188
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:820
bool is_dict_encoded_string() const
Definition: sqltypes.h:444
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:208
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
SQLTypeInfo columnType
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:291
+ Here is the call graph for this function:

◆ updateColumn() [1/2]

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 571 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, UpdelRoll::mutex, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, ColumnDescriptor::tableId, TableDescriptor::tableId, anonymous_namespace{TypedDataAccessors.h}::tabulate_metadata(), temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, updateColumnMetadata(), NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

579  {
580  updel_roll.catalog = catalog;
581  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
582  updel_roll.memoryLevel = memory_level;
583 
584  const size_t ncore = cpu_threads();
585  const auto nrow = frag_offsets.size();
586  const auto n_rhs_values = rhs_values.size();
587  if (0 == nrow) {
588  return;
589  }
590  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
591 
592  auto fragment_ptr = getFragmentInfo(fragment_id);
593  auto& fragment = *fragment_ptr;
594  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
595  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
596  ChunkKey chunk_key{
597  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
598  auto chunk = Chunk_NS::Chunk::getChunk(cd,
599  &catalog->getDataMgr(),
600  chunk_key,
602  0,
603  chunk_meta_it->second->numBytes,
604  chunk_meta_it->second->numElements);
605 
606  std::vector<int8_t> has_null_per_thread(ncore, 0);
607  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
608  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
609  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
610  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
611 
612  // parallel update elements
613  std::vector<std::future<void>> threads;
614 
615  const auto segsz = (nrow + ncore - 1) / ncore;
616  auto dbuf = chunk->getBuffer();
617  auto dbuf_addr = dbuf->getMemoryPtr();
618  dbuf->setUpdated();
619  {
620  std::lock_guard<std::mutex> lck(updel_roll.mutex);
621  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
622  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
623  }
624 
625  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
626  cd->tableId,
627  cd->columnId,
628  fragment.fragmentId};
629  updel_roll.dirtyChunkeys.insert(chunkey);
630  }
631  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
632  threads.emplace_back(std::async(
633  std::launch::async,
634  [=,
635  &has_null_per_thread,
636  &min_int64t_per_thread,
637  &max_int64t_per_thread,
638  &min_double_per_thread,
639  &max_double_per_thread,
640  &frag_offsets,
641  &rhs_values] {
642  SQLTypeInfo lhs_type = cd->columnType;
643 
644  // !! not sure if this is a undocumented convention or a bug, but for a sharded
645  // table the dictionary id of a encoded string column is not specified by
646  // comp_param in physical table but somehow in logical table :) comp_param in
647  // physical table is always 0, so need to adapt accordingly...
648  auto cdl = (shard_ < 0)
649  ? cd
650  : catalog->getMetadataForColumn(
651  catalog->getLogicalTableId(td->tableId), cd->columnId);
652  CHECK(cdl);
653  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
654  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
655  lhs_type, &decimalOverflowValidator);
656  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
657  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
658  lhs_type, &dateDaysOverflowValidator);
659 
660  StringDictionary* stringDict{nullptr};
661  if (lhs_type.is_string()) {
662  CHECK(kENCODING_DICT == lhs_type.get_compression());
663  auto dictDesc = const_cast<DictDescriptor*>(
664  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
665  CHECK(dictDesc);
666  stringDict = dictDesc->stringDict.get();
667  CHECK(stringDict);
668  }
669 
670  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
671  const auto roffs = frag_offsets[r];
672  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
673  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
674  ScalarTargetValue sv2;
675 
676  // Subtle here is on the two cases of string-to-string assignments, when
677  // upstream passes RHS string as a string index instead of a preferred "real
678  // string".
679  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
680  // index
681  // in this layer, so if upstream passes a str idx here, an
682  // exception is thrown.
683  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
684  // str idx.
685  if (rhs_type.is_string()) {
686  if (const auto vp = boost::get<int64_t>(sv)) {
687  auto dictDesc = const_cast<DictDescriptor*>(
688  catalog->getMetadataForDict(rhs_type.get_comp_param()));
689  if (nullptr == dictDesc) {
690  throw std::runtime_error(
691  "UPDATE does not support cast from string literal to string "
692  "column.");
693  }
694  auto stringDict = dictDesc->stringDict.get();
695  CHECK(stringDict);
696  sv2 = NullableString(stringDict->getString(*vp));
697  sv = &sv2;
698  }
699  }
700 
701  if (const auto vp = boost::get<int64_t>(sv)) {
702  auto v = *vp;
703  if (lhs_type.is_string()) {
704  throw std::runtime_error("UPDATE does not support cast to string.");
705  }
706  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
707  if (lhs_type.is_decimal()) {
708  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
709  int64_t decimal_val;
710  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
711  tabulate_metadata(lhs_type,
712  min_int64t_per_thread[c],
713  max_int64t_per_thread[c],
714  has_null_per_thread[c],
715  (v == inline_int_null_value<int64_t>() &&
716  lhs_type.get_notnull() == false)
717  ? v
718  : decimal_val);
719  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
720  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
721  if (positive_v_and_negative_d || negative_v_and_positive_d) {
722  throw std::runtime_error(
723  "Data conversion overflow on " + std::to_string(v) +
724  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
725  std::to_string(rhs_type.get_scale()) + ") to (" +
726  std::to_string(lhs_type.get_dimension()) + ", " +
727  std::to_string(lhs_type.get_scale()) + ")");
728  }
729  } else if (is_integral(lhs_type)) {
730  if (lhs_type.is_date_in_days()) {
731  // Store meta values in seconds
732  if (lhs_type.get_size() == 2) {
733  nullAwareDateOverflowValidator.validate<int16_t>(v);
734  } else {
735  nullAwareDateOverflowValidator.validate<int32_t>(v);
736  }
737  int64_t days;
738  get_scalar<int64_t>(data_ptr, lhs_type, days);
739  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
740  tabulate_metadata(lhs_type,
741  min_int64t_per_thread[c],
742  max_int64t_per_thread[c],
743  has_null_per_thread[c],
744  (v == inline_int_null_value<int64_t>() &&
745  lhs_type.get_notnull() == false)
746  ? NullSentinelSupplier()(lhs_type, v)
747  : seconds);
748  } else {
749  int64_t target_value;
750  if (rhs_type.is_decimal()) {
751  target_value = round(decimal_to_double(rhs_type, v));
752  } else {
753  target_value = v;
754  }
755  tabulate_metadata(lhs_type,
756  min_int64t_per_thread[c],
757  max_int64t_per_thread[c],
758  has_null_per_thread[c],
759  target_value);
760  }
761  } else {
763  lhs_type,
764  min_double_per_thread[c],
765  max_double_per_thread[c],
766  has_null_per_thread[c],
767  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
768  }
769  } else if (const auto vp = boost::get<double>(sv)) {
770  auto v = *vp;
771  if (lhs_type.is_string()) {
772  throw std::runtime_error("UPDATE does not support cast to string.");
773  }
774  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
775  if (lhs_type.is_integer()) {
776  tabulate_metadata(lhs_type,
777  min_int64t_per_thread[c],
778  max_int64t_per_thread[c],
779  has_null_per_thread[c],
780  int64_t(v));
781  } else if (lhs_type.is_fp()) {
782  tabulate_metadata(lhs_type,
783  min_double_per_thread[c],
784  max_double_per_thread[c],
785  has_null_per_thread[c],
786  double(v));
787  } else {
788  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
789  "LHS with a floating RHS.";
790  }
791  } else if (const auto vp = boost::get<float>(sv)) {
792  auto v = *vp;
793  if (lhs_type.is_string()) {
794  throw std::runtime_error("UPDATE does not support cast to string.");
795  }
796  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
797  if (lhs_type.is_integer()) {
798  tabulate_metadata(lhs_type,
799  min_int64t_per_thread[c],
800  max_int64t_per_thread[c],
801  has_null_per_thread[c],
802  int64_t(v));
803  } else {
804  tabulate_metadata(lhs_type,
805  min_double_per_thread[c],
806  max_double_per_thread[c],
807  has_null_per_thread[c],
808  double(v));
809  }
810  } else if (const auto vp = boost::get<NullableString>(sv)) {
811  const auto s = boost::get<std::string>(vp);
812  const auto sval = s ? *s : std::string("");
813  if (lhs_type.is_string()) {
814  decltype(stringDict->getOrAdd(sval)) sidx;
815  {
816  std::unique_lock<std::mutex> lock(temp_mutex_);
817  sidx = stringDict->getOrAdd(sval);
818  }
819  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
820  tabulate_metadata(lhs_type,
821  min_int64t_per_thread[c],
822  max_int64t_per_thread[c],
823  has_null_per_thread[c],
824  int64_t(sidx));
825  } else if (sval.size() > 0) {
826  auto dval = std::atof(sval.data());
827  if (lhs_type.is_boolean()) {
828  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
829  } else if (lhs_type.is_time()) {
830  throw std::runtime_error(
831  "Date/Time/Timestamp update not supported through translated "
832  "string path.");
833  }
834  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
835  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
836  tabulate_metadata(lhs_type,
837  min_double_per_thread[c],
838  max_double_per_thread[c],
839  has_null_per_thread[c],
840  double(dval));
841  } else {
842  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
843  tabulate_metadata(lhs_type,
844  min_int64t_per_thread[c],
845  max_int64t_per_thread[c],
846  has_null_per_thread[c],
847  int64_t(dval));
848  }
849  } else {
850  put_null(data_ptr, lhs_type, cd->columnName);
851  has_null_per_thread[c] = true;
852  }
853  } else {
854  CHECK(false);
855  }
856  }
857  }));
858  if (threads.size() >= (size_t)cpu_threads()) {
859  wait_cleanup_threads(threads);
860  }
861  }
862  wait_cleanup_threads(threads);
863 
864  // for unit test
866  if (cd->isDeletedCol) {
867  const auto deleted_offsets = getVacuumOffsets(chunk);
868  if (deleted_offsets.size() > 0) {
869  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
870  return;
871  }
872  }
873  }
874  bool has_null_per_chunk{false};
875  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
876  double min_double_per_chunk{std::numeric_limits<double>::max()};
877  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
878  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
879  for (size_t c = 0; c < ncore; ++c) {
880  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
881  max_double_per_chunk =
882  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
883  min_double_per_chunk =
884  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
885  max_int64t_per_chunk =
886  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
887  min_int64t_per_chunk =
888  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
889  }
891  fragment,
892  chunk,
893  has_null_per_chunk,
894  max_double_per_chunk,
895  min_double_per_chunk,
896  max_int64t_per_chunk,
897  min_int64t_per_chunk,
898  cd->columnType,
899  updel_roll);
900 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
bool is_string() const
Definition: sqltypes.h:417
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:209
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:268
#define UNREACHABLE()
Definition: Logger.h:241
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
bool is_decimal() const
Definition: sqltypes.h:420
HOST DEVICE int get_scale() const
Definition: sqltypes.h:264
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3709
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:208
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1451
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:261
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
int cpu_threads()
Definition: thread_count.h:24
std::string columnName
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ updateColumn() [2/2]

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 54 of file UpdelStorage.cpp.

References updateColumn().

62  {
63  updateColumn(catalog,
64  td,
65  cd,
66  fragment_id,
67  frag_offsets,
68  std::vector<ScalarTargetValue>(1, rhs_value),
69  rhs_type,
70  memory_level,
71  updel_roll);
72 }
void updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
+ Here is the call graph for this function:

◆ updateColumnChunkMetadata()

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnChunkMetadata ( const ColumnDescriptor cd,
const int  fragment_id,
const std::shared_ptr< ChunkMetadata metadata 
)
overridevirtual

Updates the metadata for a column chunk.

Parameters
cd- ColumnDescriptor for the column
fragment_id- Fragment id of the chunk within the column
metadata- shared_ptr of the metadata to update column chunk with

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 263 of file InsertOrderFragmenter.cpp.

References CHECK, and ColumnDescriptor::columnId.

266  {
267  // synchronize concurrent accesses to fragmentInfoVec_
268  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
269 
270  CHECK(metadata.get());
271  auto fragment_info = getFragmentInfo(fragment_id);
272  CHECK(fragment_info);
273  fragment_info->setChunkMetadata(cd->columnId, metadata);
274 }
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
#define CHECK(condition)
Definition: Logger.h:197

◆ updateColumnMetadata()

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const bool  null,
const double  dmax,
const double  dmin,
const int64_t  lmax,
const int64_t  lmin,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 902 of file UpdelStorage.cpp.

References UpdelRoll::catalog, UpdelRoll::chunkMetadata, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getMetadataForTable(), SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, UpdelRoll::mutex, UpdelRoll::numTuples, Fragmenter_Namespace::FragmentInfo::shadowNumTuples, ColumnDescriptor::tableId, and foreign_storage::update_stats().

Referenced by compactRows(), and updateColumn().

911  {
912  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
913  auto key = std::make_pair(td, &fragment);
914  std::lock_guard<std::mutex> lck(updel_roll.mutex);
915  if (0 == updel_roll.chunkMetadata.count(key)) {
916  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
917  }
918  if (0 == updel_roll.numTuples.count(key)) {
919  updel_roll.numTuples[key] = fragment.shadowNumTuples;
920  }
921  auto& chunkMetadata = updel_roll.chunkMetadata[key];
922 
923  auto buffer = chunk->getBuffer();
924  const auto& lhs_type = cd->columnType;
925 
926  auto encoder = buffer->getEncoder();
927  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
928  static_assert(std::is_same<decltype(min), decltype(max)>::value,
929  "Type mismatch on min/max");
930  if (has_null) {
931  encoder->updateStats(decltype(min)(), true);
932  }
933  if (max < min) {
934  return;
935  }
936  encoder->updateStats(min, false);
937  encoder->updateStats(max, false);
938  };
939 
940  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
941  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
942  } else if (lhs_type.is_fp()) {
943  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
944  } else if (lhs_type.is_decimal()) {
945  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
946  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
947  has_null_per_chunk);
948  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
949  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
950  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
951  }
952  buffer->getEncoder()->getMetadata(chunkMetadata[cd->columnId]);
953 }
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:420
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
SQLTypeInfo columnType
std::mutex mutex
Definition: UpdelRoll.h:49
bool is_integral(const SQLTypeInfo &t)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ updateColumns()

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor *>  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll,
Executor executor 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 267 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK, checked_get(), cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, parse_ast::end, g_enable_experimental_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, num_rows, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

277  {
278  updelRoll.is_varlen_update = true;
279  updelRoll.catalog = catalog;
280  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
281  updelRoll.memoryLevel = memoryLevel;
282 
283  size_t num_entries = sourceDataProvider.getEntryCount();
284  size_t num_rows = sourceDataProvider.getRowCount();
285 
286  if (0 == num_rows) {
287  // bail out early
288  return;
289  }
290 
292 
293  auto fragment_ptr = getFragmentInfo(fragmentId);
294  auto& fragment = *fragment_ptr;
295  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
296  get_chunks(catalog, td, fragment, memoryLevel, chunks);
297  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
298  columnDescriptors.size());
299  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
300 
301  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
302  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
303  auto chunk = chunks[indexOfChunk];
304  const auto chunk_cd = chunk->getColumnDesc();
305 
306  if (chunk_cd->isDeletedCol) {
307  deletedChunk = chunk;
308  continue;
309  }
310 
311  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
312  columnDescriptors.end(),
313  [=](const ColumnDescriptor* cd) -> bool {
314  return cd->columnId == chunk_cd->columnId;
315  });
316 
317  if (targetColumnIt != columnDescriptors.end()) {
318  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
319 
320  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
321  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
322 
324  num_rows,
325  *catalog,
326  sourceDataMetaInfo,
327  targetDescriptor,
328  targetDescriptor->columnType,
329  !targetDescriptor->columnType.get_notnull(),
330  sourceDataProvider.getLiteralDictionary(),
332  ? executor->getStringDictionaryProxy(
333  sourceDataMetaInfo.get_type_info().get_comp_param(),
334  executor->getRowSetMemoryOwner(),
335  true)
336  : nullptr};
337  auto converter = factory.create(param);
338  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
339 
340  if (targetDescriptor->columnType.is_geometry()) {
341  // geometry columns are composites
342  // need to skip chunks, depending on geo type
343  switch (targetDescriptor->columnType.get_type()) {
344  case kMULTIPOLYGON:
345  indexOfChunk += 5;
346  break;
347  case kPOLYGON:
348  indexOfChunk += 4;
349  break;
350  case kLINESTRING:
351  indexOfChunk += 2;
352  break;
353  case kPOINT:
354  indexOfChunk += 1;
355  break;
356  default:
357  CHECK(false); // not supported
358  }
359  }
360  } else {
361  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
362  std::unique_ptr<ChunkToInsertDataConverter> converter;
363 
364  if (chunk_cd->columnType.is_fixlen_array()) {
365  converter =
366  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
367  } else if (chunk_cd->columnType.is_string()) {
368  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
369  } else if (chunk_cd->columnType.is_geometry()) {
370  // the logical geo column is a string column
371  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
372  } else {
373  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
374  }
375 
376  chunkConverters.push_back(std::move(converter));
377 
378  } else if (chunk_cd->columnType.is_date_in_days()) {
379  /* Q: Why do we need this?
380  A: In variable length updates path we move the chunk content of column
381  without decoding. Since it again passes through DateDaysEncoder
382  the expected value should be in seconds, but here it will be in days.
383  Therefore, using DateChunkConverter chunk values are being scaled to
384  seconds which then ultimately encoded in days in DateDaysEncoder.
385  */
386  std::unique_ptr<ChunkToInsertDataConverter> converter;
387  const size_t physical_size = chunk_cd->columnType.get_size();
388  if (physical_size == 2) {
389  converter =
390  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
391  } else if (physical_size == 4) {
392  converter =
393  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
394  } else {
395  CHECK(false);
396  }
397  chunkConverters.push_back(std::move(converter));
398  } else {
399  std::unique_ptr<ChunkToInsertDataConverter> converter;
400  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
401  int logical_size = logical_type.get_size();
402  int physical_size = chunk_cd->columnType.get_size();
403 
404  if (logical_type.is_string()) {
405  // for dicts -> logical = physical
406  logical_size = physical_size;
407  }
408 
409  if (8 == physical_size) {
410  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
411  num_rows, chunk.get());
412  } else if (4 == physical_size) {
413  if (8 == logical_size) {
414  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
415  num_rows, chunk.get());
416  } else {
417  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
418  num_rows, chunk.get());
419  }
420  } else if (2 == chunk_cd->columnType.get_size()) {
421  if (8 == logical_size) {
422  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
423  num_rows, chunk.get());
424  } else if (4 == logical_size) {
425  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
426  num_rows, chunk.get());
427  } else {
428  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
429  num_rows, chunk.get());
430  }
431  } else if (1 == chunk_cd->columnType.get_size()) {
432  if (8 == logical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
434  num_rows, chunk.get());
435  } else if (4 == logical_size) {
436  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
437  num_rows, chunk.get());
438  } else if (2 == logical_size) {
439  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
440  num_rows, chunk.get());
441  } else {
442  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
443  num_rows, chunk.get());
444  }
445  } else {
446  CHECK(false); // unknown
447  }
448 
449  chunkConverters.push_back(std::move(converter));
450  }
451  }
452  }
453 
454  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
455  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
456 
457  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
458  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
459  deletedChunk->getColumnDesc()->tableId,
460  deletedChunk->getColumnDesc()->columnId,
461  fragment.fragmentId};
462  updelRoll.dirtyChunkeys.insert(chunkey);
463  bool* deletedChunkBuffer =
464  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
465 
466  std::atomic<size_t> row_idx{0};
467 
468  auto row_converter = [&sourceDataProvider,
469  &sourceDataConverters,
470  &indexOffFragmentOffsetColumn,
471  &chunkConverters,
472  &deletedChunkBuffer,
473  &row_idx](size_t indexOfEntry) -> void {
474  // convert the source data
475  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
476  if (row.empty()) {
477  return;
478  }
479 
480  size_t indexOfRow = row_idx.fetch_add(1);
481 
482  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
483  if (sourceDataConverters[col]) {
484  const auto& mapd_variant = row[col];
485  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
486  }
487  }
488 
489  auto scalar = checked_get(
490  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
491  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
492 
493  // convert the remaining chunks
494  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
495  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
496  }
497 
498  // now mark the row as deleted
499  deletedChunkBuffer[indexInChunkBuffer] = true;
500  };
501 
502  bool can_go_parallel = num_rows > 20000;
503 
504  if (can_go_parallel) {
505  const size_t num_worker_threads = cpu_threads();
506  std::vector<std::future<void>> worker_threads;
507  for (size_t i = 0,
508  start_entry = 0,
509  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
510  i < num_worker_threads && start_entry < num_entries;
511  ++i, start_entry += stride) {
512  const auto end_entry = std::min(start_entry + stride, num_rows);
513  worker_threads.push_back(std::async(
514  std::launch::async,
515  [&row_converter](const size_t start, const size_t end) {
516  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
517  row_converter(indexOfRow);
518  }
519  },
520  start_entry,
521  end_entry));
522  }
523 
524  for (auto& child : worker_threads) {
525  child.wait();
526  }
527 
528  } else {
529  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
530  row_converter(entryIdx);
531  }
532  }
533 
535  insert_data.databaseId = catalog->getCurrentDB().dbId;
536  insert_data.tableId = td->tableId;
537 
538  for (size_t i = 0; i < chunkConverters.size(); i++) {
539  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
540  continue;
541  }
542 
543  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
544  if (sourceDataConverters[i]) {
545  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
546  }
547  continue;
548  }
549 
550  insert_data.numRows = num_rows;
551  insertDataNoCheckpoint(insert_data);
552 
553  // update metdata
554  if (!deletedChunk->getBuffer()->hasEncoder()) {
555  deletedChunk->initEncoder();
556  }
557  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
558 
559  auto& shadowDeletedChunkMeta =
560  fragment.shadowChunkMetadataMap[deletedChunk->getColumnDesc()->columnId];
561  if (shadowDeletedChunkMeta->numElements >
562  deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
563  // the append will have populated shadow meta data, otherwise use existing num
564  // elements
565  deletedChunk->getBuffer()->getEncoder()->setNumElems(
566  shadowDeletedChunkMeta->numElements);
567  }
568  deletedChunk->getBuffer()->setUpdated();
569 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
bool is_varlen_update
Definition: UpdelRoll.h:67
bool is_string() const
Definition: sqltypes.h:417
const int8_t const int64_t * num_rows
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:820
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3709
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
virtual size_t const getEntryCount() const =0
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:208
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
bool g_enable_experimental_string_functions
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:64
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:37
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:24
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0
+ Here is the call graph for this function:

◆ updateMetadata()

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 955 of file UpdelStorage.cpp.

References UpdelRoll::chunkMetadata, fragmentInfoMutex_, and UpdelRoll::numTuples.

957  {
958  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
959  if (updel_roll.chunkMetadata.count(key)) {
960  auto& fragmentInfo = *key.second;
961  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
962  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
963  fragmentInfo.setChunkMetadataMap(chunkMetadata);
964  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
965  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
966  }
967 }
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56

◆ vacuum_fixlen_rows()

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1067 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1070  {
1071  const auto cd = chunk->getColumnDesc();
1072  const auto& col_type = cd->columnType;
1073  auto data_buffer = chunk->getBuffer();
1074  auto data_addr = data_buffer->getMemoryPtr();
1075  auto element_size =
1076  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1077  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1078  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1079  size_t nbytes_fix_data_to_keep = 0;
1080  auto nrows_to_vacuum = frag_offsets.size();
1081  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1082  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1083  auto is_last_one = irow == nrows_to_vacuum;
1084  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1085  auto maddr_to_vacuum = data_addr;
1086  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1087  if (nrows_to_keep > 0) {
1088  auto nbytes_to_keep = nrows_to_keep * element_size;
1089  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1090  // move curr fixlen row block toward front
1091  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1092  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1093  nbytes_to_keep);
1094  }
1095  irow_of_blk_to_fill += nrows_to_keep;
1096  nbytes_fix_data_to_keep += nbytes_to_keep;
1097  }
1098  irow_of_blk_to_keep = irow_to_vacuum + 1;
1099  }
1100  return nbytes_fix_data_to_keep;
1101 }
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ vacuum_varlen_rows()

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1103 of file UpdelStorage.cpp.

References Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1106  {
1107  auto data_buffer = chunk->getBuffer();
1108  auto index_buffer = chunk->getIndexBuf();
1109  auto data_addr = data_buffer->getMemoryPtr();
1110  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1111  auto index_array = (StringOffsetT*)indices_addr;
1112  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1113  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1114  size_t nbytes_fix_data_to_keep = 0;
1115  size_t nbytes_var_data_to_keep = 0;
1116  auto nrows_to_vacuum = frag_offsets.size();
1117  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1118  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1119  auto is_last_one = irow == nrows_to_vacuum;
1120  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1121  auto maddr_to_vacuum = data_addr;
1122  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1123  if (nrows_to_keep > 0) {
1124  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1125  auto nbytes_to_keep =
1126  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1127  index_array[irow_of_blk_to_keep];
1128  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1129  // move curr varlen row block toward front
1130  memmove(data_addr + ibyte_var_data_to_keep,
1131  data_addr + index_array[irow_of_blk_to_keep],
1132  nbytes_to_keep);
1133 
1134  const auto index_base = index_array[irow_of_blk_to_keep];
1135  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1136  auto& index = index_array[irow_of_blk_to_keep + i];
1137  index = ibyte_var_data_to_keep + (index - index_base);
1138  }
1139  }
1140  nbytes_var_data_to_keep += nbytes_to_keep;
1141  maddr_to_vacuum = indices_addr;
1142 
1143  constexpr static auto index_element_size = sizeof(StringOffsetT);
1144  nbytes_to_keep = nrows_to_keep * index_element_size;
1145  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1146  // move curr fixlen row block toward front
1147  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1148  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1149  nbytes_to_keep);
1150  }
1151  irow_of_blk_to_fill += nrows_to_keep;
1152  nbytes_fix_data_to_keep += nbytes_to_keep;
1153  }
1154  irow_of_blk_to_keep = irow_to_vacuum + 1;
1155  }
1156  return nbytes_var_data_to_keep;
1157 }
int32_t StringOffsetT
Definition: sqltypes.h:868
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ catalog_

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected

◆ chunkKeyPrefix_

std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 180 of file InsertOrderFragmenter.h.

◆ columnMap_

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 182 of file InsertOrderFragmenter.h.

◆ dataMgr_

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 186 of file InsertOrderFragmenter.h.

◆ defaultInsertLevel_

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

◆ fragmenterType_

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

◆ fragmentInfoMutex_

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

Referenced by updateMetadata().

◆ fragmentInfoVec_

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 184 of file InsertOrderFragmenter.h.

◆ hasMaterializedRowId_

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 205 of file InsertOrderFragmenter.h.

◆ insertMutex_

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

◆ maxChunkSize_

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 195 of file InsertOrderFragmenter.h.

◆ maxFragmentId_

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 194 of file InsertOrderFragmenter.h.

◆ maxFragmentRows_

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 190 of file InsertOrderFragmenter.h.

◆ maxRows_

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 196 of file InsertOrderFragmenter.h.

◆ mutex_access_inmem_states

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 208 of file InsertOrderFragmenter.h.

◆ numTuples_

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 193 of file InsertOrderFragmenter.h.

◆ pageSize_

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 191 of file InsertOrderFragmenter.h.

◆ physicalTableId_

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected

◆ rowIdColId_

int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 206 of file InsertOrderFragmenter.h.

◆ shard_

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 189 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

◆ temp_mutex_

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 231 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

◆ uses_foreign_storage_

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

◆ varLenColInfo_

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 207 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: