OmniSciDB  5ade3759e0
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL)
 
 ~InsertOrderFragmenter () override
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insertDataStruct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertDataNoCheckpoint (InsertData &insertDataStruct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map) override
 Update chunk stats. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor *> columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Static Public Member Functions

static void updateColumn (const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insertDataStruct)
 
void replicateData (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< FragmentInfofragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 54 of file InsertOrderFragmenter.h.

Member Typedef Documentation

◆ ModifyTransactionTracker

Constructor & Destructor Documentation

◆ InsertOrderFragmenter() [1/2]

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL 
)

◆ ~InsertOrderFragmenter()

Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 86 of file InsertOrderFragmenter.cpp.

86 {}

◆ InsertOrderFragmenter() [2/2]

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

◆ compactRows()

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1180 of file UpdelStorage.cpp.

References CHECK, cpu_threads(), anonymous_namespace{TypedDataAccessors.h}::get_element_size(), getChunksForAllColumns(), getFragmentInfoFromId(), UpdelRoll::numTuples, Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1185  {
1186  auto& fragment = getFragmentInfoFromId(fragment_id);
1187  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1188  const auto ncol = chunks.size();
1189 
1190  std::vector<int8_t> has_null_per_thread(ncol, 0);
1191  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1192  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1193  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1194  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1195 
1196  // parallel delete columns
1197  std::vector<std::future<void>> threads;
1198  auto nrows_to_vacuum = frag_offsets.size();
1199  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1200  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1201 
1202  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1203  auto chunk = chunks[ci];
1204  const auto cd = chunk->get_column_desc();
1205  const auto& col_type = cd->columnType;
1206  auto data_buffer = chunk->get_buffer();
1207  auto index_buffer = chunk->get_index_buf();
1208  auto data_addr = data_buffer->getMemoryPtr();
1209  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1210  auto index_array = (StringOffsetT*)indices_addr;
1211  bool is_varlen = col_type.is_varlen_indeed();
1212 
1213  auto fixlen_vacuum = [=,
1214  &has_null_per_thread,
1215  &max_double_per_thread,
1216  &min_double_per_thread,
1217  &min_int64t_per_thread,
1218  &max_int64t_per_thread,
1219  &updel_roll,
1220  &frag_offsets,
1221  &fragment] {
1222  size_t nbytes_fix_data_to_keep;
1223  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1224 
1225  data_buffer->encoder->setNumElems(nrows_to_keep);
1226  data_buffer->setSize(nbytes_fix_data_to_keep);
1227  data_buffer->setUpdated();
1228 
1229  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1230 
1231  auto daddr = data_addr;
1232  auto element_size =
1233  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1234  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1235  if (col_type.is_fixlen_array()) {
1236  auto encoder =
1237  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1238  CHECK(encoder);
1239  encoder->updateMetadata((int8_t*)daddr);
1240  } else if (col_type.is_fp()) {
1241  set_chunk_stats(col_type,
1242  data_addr,
1243  has_null_per_thread[ci],
1244  min_double_per_thread[ci],
1245  max_double_per_thread[ci]);
1246  } else {
1247  set_chunk_stats(col_type,
1248  data_addr,
1249  has_null_per_thread[ci],
1250  min_int64t_per_thread[ci],
1251  max_int64t_per_thread[ci]);
1252  }
1253  }
1254  };
1255 
1256  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1257  size_t nbytes_var_data_to_keep;
1258  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1259 
1260  data_buffer->encoder->setNumElems(nrows_to_keep);
1261  data_buffer->setSize(nbytes_var_data_to_keep);
1262  data_buffer->setUpdated();
1263 
1264  index_array[nrows_to_keep] = data_buffer->size();
1265  index_buffer->setSize(sizeof(*index_array) *
1266  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1267  index_buffer->setUpdated();
1268 
1269  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1270  };
1271 
1272  if (is_varlen) {
1273  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1274  } else {
1275  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1276  }
1277  if (threads.size() >= (size_t)cpu_threads()) {
1278  wait_cleanup_threads(threads);
1279  }
1280  }
1281 
1282  wait_cleanup_threads(threads);
1283 
1284  auto key = std::make_pair(td, &fragment);
1285  updel_roll.numTuples[key] = nrows_to_keep;
1286  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1287  auto chunk = chunks[ci];
1288  auto cd = chunk->get_column_desc();
1289  if (!cd->columnType.is_fixlen_array()) {
1291  fragment,
1292  chunk,
1293  has_null_per_thread[ci],
1294  max_double_per_thread[ci],
1295  min_double_per_thread[ci],
1296  max_int64t_per_thread[ci],
1297  min_int64t_per_thread[ci],
1298  cd->columnType,
1299  updel_roll);
1300  }
1301  }
1302 }
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:877
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
#define CHECK(condition)
Definition: Logger.h:187
int cpu_threads()
Definition: thread_count.h:23
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ createNewFragment()

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 588 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

589  {
590  // also sets the new fragment as the insertBuffer for each column
591 
592  maxFragmentId_++;
593  FragmentInfo newFragmentInfo;
594  newFragmentInfo.fragmentId = maxFragmentId_;
595  newFragmentInfo.shadowNumTuples = 0;
596  newFragmentInfo.setPhysicalNumTuples(0);
597  for (const auto levelSize : dataMgr_->levelSizes_) {
598  newFragmentInfo.deviceIds.push_back(newFragmentInfo.fragmentId % levelSize);
599  }
600  newFragmentInfo.physicalTableId = physicalTableId_;
601  newFragmentInfo.shard = shard_;
602 
603  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
604  colMapIt != columnMap_.end();
605  ++colMapIt) {
606  // colMapIt->second.unpin_buffer();
607  ChunkKey chunkKey = chunkKeyPrefix_;
608  chunkKey.push_back(colMapIt->second.get_column_desc()->columnId);
609  chunkKey.push_back(maxFragmentId_);
610  colMapIt->second.createChunkBuffer(
611  dataMgr_,
612  chunkKey,
613  memoryLevel,
614  newFragmentInfo.deviceIds[static_cast<int>(memoryLevel)],
615  pageSize_);
616  colMapIt->second.init_encoder();
617  }
618 
619  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
620  fragmentInfoVec_.push_back(newFragmentInfo);
621  return &(fragmentInfoVec_.back());
622 }
std::vector< int > levelSizes_
Definition: DataMgr.h:119
std::vector< int > ChunkKey
Definition: types.h:35
std::map< int, Chunk_NS::Chunk > columnMap_
+ Here is the call graph for this function:

◆ deleteFragments()

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 200 of file InsertOrderFragmenter.cpp.

200  {
201  // Fix a verified loophole on sharded logical table which is locked using logical
202  // tableId while it's its physical tables that can come here when fragments overflow
203  // during COPY. Locks on a logical table and its physical tables never intersect, which
204  // means potential races. It'll be an overkill to resolve a logical table to physical
205  // tables in MapDHandler, ParseNode or other higher layers where the logical table is
206  // locked with Table Read/Write locks; it's easier to lock the logical table of its
207  // physical tables. A downside of this approach may be loss of parallel execution of
208  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
209  // operation, the loss seems not a big deal.
210  auto chunkKeyPrefix = chunkKeyPrefix_;
211  if (shard_ >= 0) {
212  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
213  }
214 
215  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
216  // SELECT and COPY may enter a deadlock
217  using namespace Lock_Namespace;
218  WriteLock delete_lock = TableLockMgr::getWriteLockForTable(chunkKeyPrefix);
219 
220  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
221 
222  for (const auto fragId : dropFragIds) {
223  for (const auto& col : columnMap_) {
224  int colId = col.first;
225  vector<int> fragPrefix = chunkKeyPrefix_;
226  fragPrefix.push_back(colId);
227  fragPrefix.push_back(fragId);
228  dataMgr_->deleteChunksWithPrefix(fragPrefix);
229  }
230  }
231 }
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:2914
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:354
mapd_unique_lock< MutexType > WriteLock
Definition: TableLockMgr.h:47
std::map< int, Chunk_NS::Chunk > columnMap_

◆ dropFragmentsToSize()

void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 173 of file InsertOrderFragmenter.cpp.

References DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

173  {
174  // not safe to call from outside insertData
175  // b/c depends on insertLock around numTuples_
176 
177  // don't ever drop the only fragment!
178  if (numTuples_ == fragmentInfoVec_.back().getPhysicalNumTuples()) {
179  return;
180  }
181 
182  if (numTuples_ > maxRows) {
183  size_t preNumTuples = numTuples_;
184  vector<int> dropFragIds;
185  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
186  while (numTuples_ > targetRows) {
187  assert(fragmentInfoVec_.size() > 0);
188  size_t numFragTuples = fragmentInfoVec_[0].getPhysicalNumTuples();
189  dropFragIds.push_back(fragmentInfoVec_[0].fragmentId);
190  fragmentInfoVec_.pop_front();
191  assert(numTuples_ >= numFragTuples);
192  numTuples_ -= numFragTuples;
193  }
194  deleteFragments(dropFragIds);
195  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
196  << " post: " << numTuples_ << " maxRows: " << maxRows;
197  }
198 }
#define LOG(tag)
Definition: Logger.h:182
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR

◆ getChunkKeyPrefix()

std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 102 of file InsertOrderFragmenter.h.

◆ getChunkMetadata()

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 88 of file InsertOrderFragmenter.cpp.

References Data_Namespace::DISK_LEVEL, and to_string().

88  {
90  // memory-resident tables won't have anything on disk
91  std::vector<std::pair<ChunkKey, ChunkMetadata>> chunk_metadata;
93 
94  // data comes like this - database_id, table_id, column_id, fragment_id
95  // but lets sort by database_id, table_id, fragment_id, column_id
96 
97  int fragment_subkey_index = 3;
98  std::sort(chunk_metadata.begin(),
99  chunk_metadata.end(),
100  [&](const std::pair<ChunkKey, ChunkMetadata>& pair1,
101  const std::pair<ChunkKey, ChunkMetadata>& pair2) {
102  return pair1.first[3] < pair2.first[3];
103  });
104 
105  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
106  ++chunk_itr) {
107  int cur_column_id = chunk_itr->first[2];
108  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
109 
110  if (fragmentInfoVec_.empty() ||
111  cur_fragment_id != fragmentInfoVec_.back().fragmentId) {
112  maxFragmentId_ = cur_fragment_id;
113  fragmentInfoVec_.emplace_back();
114  fragmentInfoVec_.back().fragmentId = cur_fragment_id;
115  fragmentInfoVec_.back().setPhysicalNumTuples(chunk_itr->second.numElements);
116  numTuples_ += fragmentInfoVec_.back().getPhysicalNumTuples();
117  for (const auto level_size : dataMgr_->levelSizes_) {
118  fragmentInfoVec_.back().deviceIds.push_back(cur_fragment_id % level_size);
119  }
120  fragmentInfoVec_.back().shadowNumTuples =
121  fragmentInfoVec_.back().getPhysicalNumTuples();
122  fragmentInfoVec_.back().physicalTableId = physicalTableId_;
123  fragmentInfoVec_.back().shard = shard_;
124  } else {
125  if (chunk_itr->second.numElements !=
126  fragmentInfoVec_.back().getPhysicalNumTuples()) {
127  throw std::runtime_error(
128  "Inconsistency in num tuples within fragment for table " +
129  std::to_string(physicalTableId_) + ", Column " +
130  std::to_string(cur_column_id) + ". Fragment Tuples: " +
131  std::to_string(fragmentInfoVec_.back().getPhysicalNumTuples()) +
132  ", Chunk Tuples: " + std::to_string(chunk_itr->second.numElements));
133  }
134  }
135  fragmentInfoVec_.back().setChunkMetadata(cur_column_id, chunk_itr->second);
136  }
137  }
138 
139  ssize_t maxFixedColSize = 0;
140 
141  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
142  ssize_t size = colIt->second.get_column_desc()->columnType.get_size();
143  if (size == -1) { // variable length
144  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
145  size = 8; // b/c we use this for string and array indices - gross to have magic
146  // number here
147  }
148  maxFixedColSize = std::max(maxFixedColSize, size);
149  }
150 
151  // this is maximum number of rows assuming everything is fixed length
152  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
153 
154  if (fragmentInfoVec_.size() > 0) {
155  // Now need to get the insert buffers for each column - should be last
156  // fragment
157  int lastFragmentId = fragmentInfoVec_.back().fragmentId;
158  int deviceId =
159  fragmentInfoVec_.back().deviceIds[static_cast<int>(defaultInsertLevel_)];
160  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
161  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
162  insertKey.push_back(colIt->first); // column id
163  insertKey.push_back(lastFragmentId); // fragment id
164  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
165  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
166  if (varLenColInfoIt != varLenColInfo_.end()) {
167  varLenColInfoIt->second = colIt->second.get_buffer()->size();
168  }
169  }
170  }
171 }
std::vector< int > levelSizes_
Definition: DataMgr.h:119
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:330
std::unordered_map< int, size_t > varLenColInfo_
std::vector< int > ChunkKey
Definition: types.h:35
std::map< int, Chunk_NS::Chunk > columnMap_
+ Here is the call graph for this function:

◆ getChunksForAllColumns()

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 990 of file UpdelStorage.cpp.

References catalog_, CHECK, Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

993  {
994  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
995  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
996  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
997  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
998  ++ncol;
999  if (!cd->isVirtualCol) {
1000  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1001  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1002  ChunkKey chunk_key{
1003  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1004  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1005  &catalog_->getDataMgr(),
1006  chunk_key,
1007  memory_level,
1008  0,
1009  chunk_meta_it->second.numBytes,
1010  chunk_meta_it->second.numElements);
1011  chunks.push_back(chunk);
1012  }
1013  }
1014  }
1015  return chunks;
1016 }
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getFragmenterId()

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 101 of file InsertOrderFragmenter.h.

101 { return chunkKeyPrefix_.back(); }

◆ getFragmenterType()

std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 106 of file InsertOrderFragmenter.h.

◆ getFragmentInfoFromId()

FragmentInfo & Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected

Definition at line 45 of file UpdelStorage.cpp.

References CHECK, and fragmentInfoVec_.

Referenced by compactRows(), updateColumn(), and updateColumns().

45  {
46  auto fragment_it = std::find_if(
47  fragmentInfoVec_.begin(), fragmentInfoVec_.end(), [=](const auto& f) -> bool {
48  return f.fragmentId == fragment_id;
49  });
50  CHECK(fragment_it != fragmentInfoVec_.end());
51  return *fragment_it;
52 }
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the caller graph for this function:

◆ getFragmentsForQuery()

TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 624 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

624  {
625  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
626  TableInfo queryInfo;
627  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
628  // right now we don't test predicate, so just return (copy of) all fragments
629  bool fragmentsExist = false;
630  if (fragmentInfoVec_.empty()) {
631  // If we have no fragments add a dummy empty fragment to make the executor
632  // not have separate logic for 0-row tables
633  int maxFragmentId = 0;
634  FragmentInfo emptyFragmentInfo;
635  emptyFragmentInfo.fragmentId = maxFragmentId;
636  emptyFragmentInfo.shadowNumTuples = 0;
637  emptyFragmentInfo.setPhysicalNumTuples(0);
638  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
639  emptyFragmentInfo.physicalTableId = physicalTableId_;
640  emptyFragmentInfo.shard = shard_;
641  queryInfo.fragments.push_back(emptyFragmentInfo);
642  } else {
643  fragmentsExist = true;
644  queryInfo.fragments = fragmentInfoVec_; // makes a copy
645  }
646  readLock.unlock();
647  queryInfo.setPhysicalNumTuples(0);
648  auto partIt = queryInfo.fragments.begin();
649  if (fragmentsExist) {
650  while (partIt != queryInfo.fragments.end()) {
651  if (partIt->getPhysicalNumTuples() == 0) {
652  // this means that a concurrent insert query inserted tuples into a new fragment
653  // but when the query came in we didn't have this fragment. To make sure we don't
654  // mess up the executor we delete this fragment from the metadatamap (fixes
655  // earlier bug found 2015-05-08)
656  partIt = queryInfo.fragments.erase(partIt);
657  } else {
658  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
659  partIt->getPhysicalNumTuples());
660  ++partIt;
661  }
662  }
663  } else {
664  // We added a dummy fragment and know the table is empty
665  queryInfo.setPhysicalNumTuples(0);
666  }
667  return queryInfo;
668 }
std::vector< int > levelSizes_
Definition: DataMgr.h:119
+ Here is the call graph for this function:

◆ getNumRows()

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual

◆ getVacuumOffsets()

const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1019 of file UpdelStorage.cpp.

References CHECK, cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1020  {
1021  const auto data_buffer = chunk->get_buffer();
1022  const auto data_addr = data_buffer->getMemoryPtr();
1023  const size_t nrows_in_chunk = data_buffer->size();
1024  const size_t ncore = cpu_threads();
1025  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1026  std::vector<std::vector<uint64_t>> deleted_offsets;
1027  deleted_offsets.resize(ncore);
1028  std::vector<std::future<void>> threads;
1029  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1030  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1031  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1032  const auto ithread = rbegin / segsz;
1033  CHECK(ithread < deleted_offsets.size());
1034  deleted_offsets[ithread].reserve(segsz);
1035  for (size_t r = rbegin; r < rend; ++r) {
1036  if (data_addr[r]) {
1037  deleted_offsets[ithread].push_back(r);
1038  }
1039  }
1040  }));
1041  }
1042  wait_cleanup_threads(threads);
1043  std::vector<uint64_t> all_deleted_offsets;
1044  for (size_t i = 0; i < ncore; ++i) {
1045  all_deleted_offsets.insert(
1046  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1047  }
1048  return all_deleted_offsets;
1049 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
#define CHECK(condition)
Definition: Logger.h:187
int cpu_threads()
Definition: thread_count.h:23
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ insertData()

void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insertDataStruct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 316 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData(), and Fragmenter_Namespace::ChunkToInsertDataConverter::~ChunkToInsertDataConverter().

316  {
317  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
318  try {
319  mapd_unique_lock<mapd_shared_mutex> insertLock(
320  insertMutex_); // prevent two threads from trying to insert into the same table
321  // simultaneously
322 
323  insertDataImpl(insertDataStruct);
324 
325  if (defaultInsertLevel_ ==
326  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
328  chunkKeyPrefix_[0],
329  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
330  }
331  } catch (...) {
332  int32_t tableEpoch =
333  catalog_->getTableEpoch(insertDataStruct.databaseId, insertDataStruct.tableId);
334 
335  // the statement below deletes *this* object!
336  // relying on exception propagation at this stage
337  // until we can sort this out in a cleaner fashion
339  insertDataStruct.databaseId, insertDataStruct.tableId, tableEpoch);
340  throw;
341  }
342 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:406
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2123
void insertDataImpl(InsertData &insertDataStruct)
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2153
+ Here is the caller graph for this function:

◆ insertDataImpl()

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insertDataStruct)
protected

Definition at line 430 of file InsertOrderFragmenter.cpp.

References CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnDescriptors, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::FragmentInfo::fragmentId, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, Fragmenter_Namespace::InsertData::replicate_count, Fragmenter_Namespace::FragmentInfo::shadowChunkMetadataMap, and Fragmenter_Namespace::FragmentInfo::shadowNumTuples.

430  {
431  // populate deleted system column if it should exists, as it will not come from client
432  // Do not add this magical column in the replicate ALTER TABLE ADD route as
433  // it is not needed and will cause issues
434  std::unique_ptr<int8_t[]> data_for_deleted_column;
435  for (const auto& cit : columnMap_) {
436  if (cit.second.get_column_desc()->isDeletedCol &&
437  insertDataStruct.replicate_count == 0) {
438  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
439  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
440  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
441  insertDataStruct.columnIds.push_back(cit.second.get_column_desc()->columnId);
442  insertDataStruct.columnDescriptors[cit.first] = cit.second.get_column_desc();
443  break;
444  }
445  }
446  // MAT we need to add a removal of the empty column we pushed onto here
447  // for upstream safety. Should not be a problem but need to fix.
448 
449  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
450  for (const auto columnId : insertDataStruct.columnIds) {
451  if (columnMap_.end() == columnMap_.find(columnId)) {
452  columnMap_.emplace(
453  columnId, Chunk_NS::Chunk(insertDataStruct.columnDescriptors.at(columnId)));
454  }
455  }
456 
457  // when replicate (add) column(s), this path seems wont work; go separate route...
458  if (insertDataStruct.replicate_count > 0) {
459  replicateData(insertDataStruct);
460  return;
461  }
462 
463  std::unordered_map<int, int> inverseInsertDataColIdMap;
464  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
465  inverseInsertDataColIdMap.insert(
466  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
467  }
468 
469  size_t numRowsLeft = insertDataStruct.numRows;
470  size_t numRowsInserted = 0;
471  vector<DataBlockPtr> dataCopy =
472  insertDataStruct.data; // bc append data will move ptr forward and this violates
473  // constness of InsertData
474  if (numRowsLeft <= 0) {
475  return;
476  }
477 
478  FragmentInfo* currentFragment = 0;
479 
480  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
481  currentFragment = createNewFragment(defaultInsertLevel_);
482  } else {
483  currentFragment = &(fragmentInfoVec_.back());
484  }
485  size_t startFragment = fragmentInfoVec_.size() - 1;
486 
487  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
488  // loop until done inserting all rows
489  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
490  size_t rowsLeftInCurrentFragment =
491  maxFragmentRows_ - currentFragment->shadowNumTuples;
492  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
493  if (rowsLeftInCurrentFragment != 0) {
494  for (auto& varLenColInfoIt : varLenColInfo_) {
495  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
496  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
497  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
498  if (insertIdIt != inverseInsertDataColIdMap.end()) {
499  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
500  numRowsToInsert = std::min(
501  numRowsToInsert,
502  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
503  numRowsToInsert,
504  numRowsInserted,
505  bytesLeft));
506  }
507  }
508  }
509 
510  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
511  currentFragment = createNewFragment(defaultInsertLevel_);
512  if (numRowsInserted == 0) {
513  startFragment++;
514  }
515  rowsLeftInCurrentFragment = maxFragmentRows_;
516  for (auto& varLenColInfoIt : varLenColInfo_) {
517  varLenColInfoIt.second = 0; // reset byte counter
518  }
519  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
520  for (auto& varLenColInfoIt : varLenColInfo_) {
521  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
522  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
523  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
524  if (insertIdIt != inverseInsertDataColIdMap.end()) {
525  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
526  numRowsToInsert = std::min(
527  numRowsToInsert,
528  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
529  numRowsToInsert,
530  numRowsInserted,
531  bytesLeft));
532  }
533  }
534  }
535 
536  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
537  // never be able to insert anything
538 
539  // for each column, append the data in the appropriate insert buffer
540  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
541  int columnId = insertDataStruct.columnIds[i];
542  auto colMapIt = columnMap_.find(columnId);
543  assert(colMapIt != columnMap_.end());
544  currentFragment->shadowChunkMetadataMap[columnId] =
545  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
546  auto varLenColInfoIt = varLenColInfo_.find(columnId);
547  if (varLenColInfoIt != varLenColInfo_.end()) {
548  varLenColInfoIt->second = colMapIt->second.get_buffer()->size();
549  }
550  }
551  if (hasMaterializedRowId_) {
552  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
553  currentFragment->shadowNumTuples;
554  int64_t* rowIdData = new int64_t[numRowsToInsert];
555  for (size_t i = 0; i < numRowsToInsert; ++i) {
556  rowIdData[i] = i + startId;
557  }
558  DataBlockPtr rowIdBlock;
559  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(rowIdData);
560  auto colMapIt = columnMap_.find(rowIdColId_);
561  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
562  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
563  delete[] rowIdData;
564  }
565 
566  currentFragment->shadowNumTuples =
567  fragmentInfoVec_.back().getPhysicalNumTuples() + numRowsToInsert;
568  numRowsLeft -= numRowsToInsert;
569  numRowsInserted += numRowsToInsert;
570  }
571  {
572  // Only take the fragment info lock when updating fragment info map. Otherwise,
573  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
574  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
575  // COPY_FROM waits for TableWriteLock
576  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
577  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
578  partIt != fragmentInfoVec_.end();
579  ++partIt) {
580  partIt->setPhysicalNumTuples(partIt->shadowNumTuples);
581  partIt->setChunkMetadataMap(partIt->shadowChunkMetadataMap);
582  }
583  }
584  numTuples_ += insertDataStruct.numRows;
586 }
void dropFragmentsToSize(const size_t maxRows) override
Will truncate table to less than maxRows by dropping fragments.
#define CHECK_GT(x, y)
Definition: Logger.h:199
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:198
std::unordered_map< int, size_t > varLenColInfo_
void replicateData(const InsertData &insertDataStruct)
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:137

◆ insertDataNoCheckpoint()

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insertDataStruct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 344 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

344  {
345  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
346  mapd_unique_lock<mapd_shared_mutex> insertLock(
347  insertMutex_); // prevent two threads from trying to insert into the same table
348  // simultaneously
349  insertDataImpl(insertDataStruct);
350 }
void insertDataImpl(InsertData &insertDataStruct)
+ Here is the caller graph for this function:

◆ lockInsertCheckpointData()

void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected

◆ operator=()

InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected

◆ replicateData()

void Fragmenter_Namespace::InsertOrderFragmenter::replicateData ( const InsertData insertDataStruct)
protected

Definition at line 352 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::bypass, CHECK, Fragmenter_Namespace::InsertData::columnDescriptors, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

352  {
353  size_t numRowsLeft = insertDataStruct.numRows;
354  for (auto& fragmentInfo : fragmentInfoVec_) {
355  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysical();
356  auto numRowsToInsert = fragmentInfo.getPhysicalNumTuples(); // not getNumTuples()
357  size_t numRowsCanBeInserted;
358  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
359  if (insertDataStruct.bypass[i]) {
360  continue;
361  }
362  auto columnId = insertDataStruct.columnIds[i];
363  auto colDesc = insertDataStruct.columnDescriptors.at(columnId);
364  CHECK(columnMap_.find(columnId) != columnMap_.end());
365 
366  ChunkKey chunkKey = chunkKeyPrefix_;
367  chunkKey.push_back(columnId);
368  chunkKey.push_back(fragmentInfo.fragmentId);
369 
370  auto colMapIt = columnMap_.find(columnId);
371  auto& chunk = colMapIt->second;
372  if (chunk.isChunkOnDevice(
373  dataMgr_,
374  chunkKey,
376  fragmentInfo.deviceIds[static_cast<int>(defaultInsertLevel_)])) {
377  dataMgr_->deleteChunksWithPrefix(chunkKey);
378  }
379  chunk.createChunkBuffer(
380  dataMgr_,
381  chunkKey,
383  fragmentInfo.deviceIds[static_cast<int>(defaultInsertLevel_)]);
384  chunk.init_encoder();
385 
386  try {
387  DataBlockPtr dataCopy = insertDataStruct.data[i];
388  auto size = colDesc->columnType.get_size();
389  if (0 > size) {
390  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
391  varLenColInfo_[columnId] = 0;
392  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
393  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
394  } else {
395  numRowsCanBeInserted = maxChunkSize_ / size;
396  }
397 
398  // FIXME: abort a case in which new column is wider than existing columns
399  if (numRowsCanBeInserted < numRowsToInsert) {
400  throw std::runtime_error("new column '" + colDesc->columnName +
401  "' wider than existing columns is not supported");
402  }
403 
404  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
405  {
406  std::unique_lock<std::mutex> lck(*fragmentInfo.mutex_access_inmem_states);
407  fragmentInfo.shadowChunkMetadataMap[columnId] = chunkMetadata;
408  }
409 
410  // update total size of var-len column in (actually the last) fragment
411  if (0 > size) {
412  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
413  varLenColInfo_[columnId] = chunk.get_buffer()->size();
414  }
415  } catch (...) {
416  dataMgr_->deleteChunksWithPrefix(chunkKey);
417  throw;
418  }
419  }
420  numRowsLeft -= numRowsToInsert;
421  }
422  CHECK(0 == numRowsLeft);
423 
424  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
425  for (auto& fragmentInfo : fragmentInfoVec_) {
426  fragmentInfo.setChunkMetadataMap(fragmentInfo.shadowChunkMetadataMap);
427  }
428 }
std::shared_ptr< std::mutex > mutex_access_inmem_states
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:354
std::unordered_map< int, size_t > varLenColInfo_
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
std::map< int, Chunk_NS::Chunk > columnMap_

◆ setNumRows()

void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual

◆ updateChunkStats()

void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 233 of file InsertOrderFragmenter.cpp.

References CHECK, ChunkMetadata::chunkStats, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), LOG, showChunk(), VLOG, and logger::WARNING.

235  {
241  if (shard_ >= 0) {
242  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
243  }
244 
245  CHECK(cd);
246  const auto column_id = cd->columnId;
247  const auto col_itr = columnMap_.find(column_id);
248  CHECK(col_itr != columnMap_.end());
249 
250  for (auto& fragment : fragmentInfoVec_) {
251  auto stats_itr = stats_map.find(fragment.fragmentId);
252  if (stats_itr != stats_map.end()) {
253  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(column_id);
254  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
255  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
257  column_id,
258  fragment.fragmentId};
259  auto chunk = Chunk_NS::Chunk::getChunk(cd,
260  &catalog_->getDataMgr(),
261  chunk_key,
263  0,
264  chunk_meta_it->second.numBytes,
265  chunk_meta_it->second.numElements);
266  auto buf = chunk->get_buffer();
267  CHECK(buf);
268  auto encoder = buf->encoder.get();
269  if (!encoder) {
270  throw std::runtime_error("No encoder for chunk " + showChunk(chunk_key));
271  }
272 
273  auto chunk_stats = stats_itr->second;
274 
275  ChunkMetadata old_chunk_metadata;
276  encoder->getMetadata(old_chunk_metadata);
277  auto& old_chunk_stats = old_chunk_metadata.chunkStats;
278 
279  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
280  // Use the logical type to display data, since the encoding should be ignored
281  const auto logical_ti = get_logical_type_info(cd->columnType);
282  if (!didResetStats) {
283  VLOG(3) << "Skipping chunk stats reset for " << showChunk(chunk_key);
284  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
285  << DatumToString(chunk_stats.max, logical_ti);
286  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
287  << DatumToString(chunk_stats.min, logical_ti);
288  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
289  continue; // move to next fragment
290  }
291 
292  VLOG(2) << "Resetting chunk stats for " << showChunk(chunk_key);
293  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
294  << DatumToString(chunk_stats.max, logical_ti);
295  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
296  << DatumToString(chunk_stats.min, logical_ti);
297  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
298 
299  // Reset fragment metadata map and set buffer to dirty
300  ChunkMetadata new_metadata;
301  // Run through fillChunkStats to ensure any transformations to the raw metadata
302  // values get applied (e.g. for date in days)
303  encoder->getMetadata(new_metadata);
304  fragment.setChunkMetadata(column_id, new_metadata);
305  fragment.shadowChunkMetadataMap =
306  fragment.getChunkMetadataMap(); // TODO(adb): needed?
307  buf->setDirty();
308  } else {
309  LOG(WARNING) << "No chunk stats update found for fragment " << fragment.fragmentId
310  << ", table " << physicalTableId_ << ", "
311  << ", column " << column_id;
312  }
313  }
314 }
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:193
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
#define LOG(tag)
Definition: Logger.h:182
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:840
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
SQLTypeInfo columnType
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:277
+ Here is the call graph for this function:

◆ updateColumn() [1/3]

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const std::string &  tab_name,
const std::string &  col_name,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
static

Definition at line 60 of file UpdelStorage.cpp.

References CHECK, Catalog_Namespace::Catalog::getMetadataForColumn(), and Catalog_Namespace::Catalog::getMetadataForTable().

Referenced by anonymous_namespace{UpdelStorageTest.cpp}::prepare_table_for_delete(), anonymous_namespace{UpdelStorageTest.cpp}::update_common(), and updateColumn().

68  {
69  const auto td = catalog->getMetadataForTable(tab_name);
70  CHECK(td);
71  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
72  CHECK(cd);
73  td->fragmenter->updateColumn(catalog,
74  td,
75  cd,
76  fragment_id,
77  frag_offsets,
78  rhs_values,
79  rhs_type,
80  memory_level,
81  updel_roll);
82 }
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ updateColumn() [2/3]

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 581 of file UpdelStorage.cpp.

References anonymous_namespace{ExecuteTest.cpp}::c(), UpdelRoll::catalog, CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, SQLTypeInfoCore< TYPE_FACET_PACK >::get_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_dimension(), anonymous_namespace{TypedDataAccessors.h}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfoFromId(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_boolean(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, UpdelRoll::mutex, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, ColumnDescriptor::tableId, TableDescriptor::tableId, anonymous_namespace{TypedDataAccessors.h}::tabulate_metadata(), temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, updateColumnMetadata(), v(), NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

589  {
590  updel_roll.catalog = catalog;
591  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
592  updel_roll.memoryLevel = memory_level;
593 
594  const size_t ncore = cpu_threads();
595  const auto nrow = frag_offsets.size();
596  const auto n_rhs_values = rhs_values.size();
597  if (0 == nrow) {
598  return;
599  }
600  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
601 
602  auto& fragment = getFragmentInfoFromId(fragment_id);
603  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
604  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
605  ChunkKey chunk_key{
606  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
607  auto chunk = Chunk_NS::Chunk::getChunk(cd,
608  &catalog->getDataMgr(),
609  chunk_key,
611  0,
612  chunk_meta_it->second.numBytes,
613  chunk_meta_it->second.numElements);
614 
615  std::vector<int8_t> has_null_per_thread(ncore, 0);
616  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
617  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
618  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
619  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
620 
621  // parallel update elements
622  std::vector<std::future<void>> threads;
623 
624  const auto segsz = (nrow + ncore - 1) / ncore;
625  auto dbuf = chunk->get_buffer();
626  auto dbuf_addr = dbuf->getMemoryPtr();
627  dbuf->setUpdated();
628  {
629  std::lock_guard<std::mutex> lck(updel_roll.mutex);
630  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
631  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
632  }
633 
634  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
635  cd->tableId,
636  cd->columnId,
637  fragment.fragmentId};
638  updel_roll.dirtyChunkeys.insert(chunkey);
639  }
640  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
641  threads.emplace_back(std::async(
642  std::launch::async,
643  [=,
644  &has_null_per_thread,
645  &min_int64t_per_thread,
646  &max_int64t_per_thread,
647  &min_double_per_thread,
648  &max_double_per_thread,
649  &frag_offsets,
650  &rhs_values] {
651  SQLTypeInfo lhs_type = cd->columnType;
652 
653  // !! not sure if this is a undocumented convention or a bug, but for a sharded
654  // table the dictionary id of a encoded string column is not specified by
655  // comp_param in physical table but somehow in logical table :) comp_param in
656  // physical table is always 0, so need to adapt accordingly...
657  auto cdl = (shard_ < 0)
658  ? cd
659  : catalog->getMetadataForColumn(
660  catalog->getLogicalTableId(td->tableId), cd->columnId);
661  CHECK(cdl);
662  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
663  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
664  lhs_type, &decimalOverflowValidator);
665  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
666  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
667  lhs_type, &dateDaysOverflowValidator);
668 
669  StringDictionary* stringDict{nullptr};
670  if (lhs_type.is_string()) {
671  CHECK(kENCODING_DICT == lhs_type.get_compression());
672  auto dictDesc = const_cast<DictDescriptor*>(
673  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
674  CHECK(dictDesc);
675  stringDict = dictDesc->stringDict.get();
676  CHECK(stringDict);
677  }
678 
679  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
680  const auto roffs = frag_offsets[r];
681  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
682  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
683  ScalarTargetValue sv2;
684 
685  // Subtle here is on the two cases of string-to-string assignments, when
686  // upstream passes RHS string as a string index instead of a preferred "real
687  // string".
688  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
689  // index
690  // in this layer, so if upstream passes a str idx here, an
691  // exception is thrown.
692  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
693  // str idx.
694  if (rhs_type.is_string()) {
695  if (const auto vp = boost::get<int64_t>(sv)) {
696  auto dictDesc = const_cast<DictDescriptor*>(
697  catalog->getMetadataForDict(rhs_type.get_comp_param()));
698  if (nullptr == dictDesc) {
699  throw std::runtime_error(
700  "UPDATE does not support cast from string literal to string "
701  "column.");
702  }
703  auto stringDict = dictDesc->stringDict.get();
704  CHECK(stringDict);
705  sv2 = NullableString(stringDict->getString(*vp));
706  sv = &sv2;
707  }
708  }
709 
710  if (const auto vp = boost::get<int64_t>(sv)) {
711  auto v = *vp;
712  if (lhs_type.is_string()) {
713  throw std::runtime_error("UPDATE does not support cast to string.");
714  }
715  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
716  if (lhs_type.is_decimal()) {
717  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
718  int64_t decimal_val;
719  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
720  tabulate_metadata(lhs_type,
721  min_int64t_per_thread[c],
722  max_int64t_per_thread[c],
723  has_null_per_thread[c],
724  (v == inline_int_null_value<int64_t>() &&
725  lhs_type.get_notnull() == false)
726  ? v
727  : decimal_val);
728  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
729  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
730  if (positive_v_and_negative_d || negative_v_and_positive_d) {
731  throw std::runtime_error(
732  "Data conversion overflow on " + std::to_string(v) +
733  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
734  std::to_string(rhs_type.get_scale()) + ") to (" +
735  std::to_string(lhs_type.get_dimension()) + ", " +
736  std::to_string(lhs_type.get_scale()) + ")");
737  }
738  } else if (is_integral(lhs_type)) {
739  if (lhs_type.is_date_in_days()) {
740  // Store meta values in seconds
741  if (lhs_type.get_size() == 2) {
742  nullAwareDateOverflowValidator.validate<int16_t>(v);
743  } else {
744  nullAwareDateOverflowValidator.validate<int32_t>(v);
745  }
746  int64_t days;
747  get_scalar<int64_t>(data_ptr, lhs_type, days);
748  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
749  tabulate_metadata(lhs_type,
750  min_int64t_per_thread[c],
751  max_int64t_per_thread[c],
752  has_null_per_thread[c],
753  (v == inline_int_null_value<int64_t>() &&
754  lhs_type.get_notnull() == false)
755  ? NullSentinelSupplier()(lhs_type, v)
756  : seconds);
757  } else {
758  int64_t target_value;
759  if (rhs_type.is_decimal()) {
760  target_value = round(decimal_to_double(rhs_type, v));
761  } else {
762  target_value = v;
763  }
764  tabulate_metadata(lhs_type,
765  min_int64t_per_thread[c],
766  max_int64t_per_thread[c],
767  has_null_per_thread[c],
768  target_value);
769  }
770  } else {
772  lhs_type,
773  min_double_per_thread[c],
774  max_double_per_thread[c],
775  has_null_per_thread[c],
776  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
777  }
778  } else if (const auto vp = boost::get<double>(sv)) {
779  auto v = *vp;
780  if (lhs_type.is_string()) {
781  throw std::runtime_error("UPDATE does not support cast to string.");
782  }
783  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
784  if (lhs_type.is_integer()) {
785  tabulate_metadata(lhs_type,
786  min_int64t_per_thread[c],
787  max_int64t_per_thread[c],
788  has_null_per_thread[c],
789  int64_t(v));
790  } else if (lhs_type.is_fp()) {
791  tabulate_metadata(lhs_type,
792  min_double_per_thread[c],
793  max_double_per_thread[c],
794  has_null_per_thread[c],
795  double(v));
796  } else {
797  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
798  "LHS with a floating RHS.";
799  }
800  } else if (const auto vp = boost::get<float>(sv)) {
801  auto v = *vp;
802  if (lhs_type.is_string()) {
803  throw std::runtime_error("UPDATE does not support cast to string.");
804  }
805  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
806  if (lhs_type.is_integer()) {
807  tabulate_metadata(lhs_type,
808  min_int64t_per_thread[c],
809  max_int64t_per_thread[c],
810  has_null_per_thread[c],
811  int64_t(v));
812  } else {
813  tabulate_metadata(lhs_type,
814  min_double_per_thread[c],
815  max_double_per_thread[c],
816  has_null_per_thread[c],
817  double(v));
818  }
819  } else if (const auto vp = boost::get<NullableString>(sv)) {
820  const auto s = boost::get<std::string>(vp);
821  const auto sval = s ? *s : std::string("");
822  if (lhs_type.is_string()) {
823  decltype(stringDict->getOrAdd(sval)) sidx;
824  {
825  std::unique_lock<std::mutex> lock(temp_mutex_);
826  sidx = stringDict->getOrAdd(sval);
827  }
828  put_scalar<int32_t>(data_ptr, lhs_type, sidx, cd->columnName);
829  tabulate_metadata(lhs_type,
830  min_int64t_per_thread[c],
831  max_int64t_per_thread[c],
832  has_null_per_thread[c],
833  int64_t(sidx));
834  } else if (sval.size() > 0) {
835  auto dval = std::atof(sval.data());
836  if (lhs_type.is_boolean()) {
837  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
838  } else if (lhs_type.is_time()) {
839  throw std::runtime_error(
840  "Date/Time/Timestamp update not supported through translated "
841  "string path.");
842  }
843  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
844  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
845  tabulate_metadata(lhs_type,
846  min_double_per_thread[c],
847  max_double_per_thread[c],
848  has_null_per_thread[c],
849  double(dval));
850  } else {
851  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
852  tabulate_metadata(lhs_type,
853  min_int64t_per_thread[c],
854  max_int64t_per_thread[c],
855  has_null_per_thread[c],
856  int64_t(dval));
857  }
858  } else {
859  put_null(data_ptr, lhs_type, cd->columnName);
860  has_null_per_thread[c] = true;
861  }
862  } else {
863  CHECK(false);
864  }
865  }
866  }));
867  if (threads.size() >= (size_t)cpu_threads()) {
868  wait_cleanup_threads(threads);
869  }
870  }
871  wait_cleanup_threads(threads);
872 
873  // for unit test
875  if (cd->isDeletedCol) {
876  const auto deleted_offsets = getVacuumOffsets(chunk);
877  if (deleted_offsets.size() > 0) {
878  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
879  return;
880  }
881  }
882  }
883  bool has_null_per_chunk{false};
884  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
885  double min_double_per_chunk{std::numeric_limits<double>::max()};
886  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
887  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
888  for (size_t c = 0; c < ncore; ++c) {
889  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
890  max_double_per_chunk =
891  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
892  min_double_per_chunk =
893  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
894  max_int64t_per_chunk =
895  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
896  min_int64t_per_chunk =
897  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
898  }
900  fragment,
901  chunk,
902  has_null_per_chunk,
903  max_double_per_chunk,
904  min_double_per_chunk,
905  max_int64t_per_chunk,
906  min_int64t_per_chunk,
907  cd->columnType,
908  updel_roll);
909 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:325
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
void c(const std::string &query_string, const ExecutorDeviceType device_type)
HOST DEVICE int get_scale() const
Definition: sqltypes.h:328
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:231
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:331
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:2914
int64_t get_epoch_seconds_from_days(const int64_t days)
T v(const TargetValue &r)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
bool is_decimal() const
Definition: sqltypes.h:453
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1348
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:332
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
int cpu_threads()
Definition: thread_count.h:23
std::string columnName
bool is_string() const
Definition: sqltypes.h:450
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
+ Here is the call graph for this function:

◆ updateColumn() [3/3]

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 84 of file UpdelStorage.cpp.

References updateColumn().

92  {
93  updateColumn(catalog,
94  td,
95  cd,
96  fragment_id,
97  frag_offsets,
98  std::vector<ScalarTargetValue>(1, rhs_value),
99  rhs_type,
100  memory_level,
101  updel_roll);
102 }
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
+ Here is the call graph for this function:

◆ updateColumnMetadata()

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const bool  null,
const double  dmax,
const double  dmin,
const int64_t  lmax,
const int64_t  lmin,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 911 of file UpdelStorage.cpp.

References UpdelRoll::catalog, UpdelRoll::chunkMetadata, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getMetadataForTable(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, UpdelRoll::mutex, UpdelRoll::numTuples, Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and ColumnDescriptor::tableId.

Referenced by compactRows(), and updateColumn().

920  {
921  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
922  auto key = std::make_pair(td, &fragment);
923  std::lock_guard<std::mutex> lck(updel_roll.mutex);
924  if (0 == updel_roll.chunkMetadata.count(key)) {
925  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
926  }
927  if (0 == updel_roll.numTuples.count(key)) {
928  updel_roll.numTuples[key] = fragment.shadowNumTuples;
929  }
930  auto& chunkMetadata = updel_roll.chunkMetadata[key];
931 
932  auto buffer = chunk->get_buffer();
933  const auto& lhs_type = cd->columnType;
934 
935  auto update_stats = [& encoder = buffer->encoder](auto min, auto max, auto has_null) {
936  static_assert(std::is_same<decltype(min), decltype(max)>::value,
937  "Type mismatch on min/max");
938  if (has_null) {
939  encoder->updateStats(decltype(min)(), true);
940  }
941  if (max < min) {
942  return;
943  }
944  encoder->updateStats(min, false);
945  encoder->updateStats(max, false);
946  };
947 
948  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
949  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
950  } else if (lhs_type.is_fp()) {
951  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
952  } else if (lhs_type.is_decimal()) {
953  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
954  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
955  has_null_per_chunk);
956  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
957  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
958  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
959  }
960  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
961 
962  // removed as @alex suggests. keep it commented in case of any chance to revisit
963  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
964 }
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
bool is_decimal() const
Definition: sqltypes.h:453
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
SQLTypeInfo columnType
std::mutex mutex
Definition: UpdelRoll.h:49
bool is_integral(const SQLTypeInfo &t)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ updateColumns()

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor *>  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 292 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK, checked_get(), Fragmenter_Namespace::RowDataProvider::count(), cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), getFragmentInfoFromId(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), insertDataNoCheckpoint(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, num_rows, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

301  {
302  if (!is_feature_enabled<VarlenUpdates>()) {
303  throw std::runtime_error("varlen UPDATE path not enabled.");
304  }
305 
306  updelRoll.is_varlen_update = true;
307  updelRoll.catalog = catalog;
308  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
309  updelRoll.memoryLevel = memoryLevel;
310 
311  size_t num_rows = sourceDataProvider.count();
312 
313  if (0 == num_rows) {
314  // bail out early
315  return;
316  }
317 
319 
320  auto& fragment = getFragmentInfoFromId(fragmentId);
321  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
322  get_chunks(catalog, td, fragment, memoryLevel, chunks);
323  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
324  columnDescriptors.size());
325  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
326 
327  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
328  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
329  auto chunk = chunks[indexOfChunk];
330  const auto chunk_cd = chunk->get_column_desc();
331 
332  if (chunk_cd->isDeletedCol) {
333  deletedChunk = chunk;
334  continue;
335  }
336 
337  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
338  columnDescriptors.end(),
339  [=](const ColumnDescriptor* cd) -> bool {
340  return cd->columnId == chunk_cd->columnId;
341  });
342 
343  if (targetColumnIt != columnDescriptors.end()) {
344  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
345 
346  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
347  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
348 
350  *catalog,
351  sourceDataMetaInfo,
352  targetDescriptor,
353  targetDescriptor->columnType,
354  !targetDescriptor->columnType.get_notnull(),
355  sourceDataProvider.getLiteralDictionary()};
356  auto converter = factory.create(param);
357  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
358 
359  if (targetDescriptor->columnType.is_geometry()) {
360  // geometry columns are composites
361  // need to skip chunks, depending on geo type
362  switch (targetDescriptor->columnType.get_type()) {
363  case kMULTIPOLYGON:
364  indexOfChunk += 5;
365  break;
366  case kPOLYGON:
367  indexOfChunk += 4;
368  break;
369  case kLINESTRING:
370  indexOfChunk += 2;
371  break;
372  case kPOINT:
373  indexOfChunk += 1;
374  break;
375  default:
376  CHECK(false); // not supported
377  }
378  }
379  } else {
380  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
381  std::unique_ptr<ChunkToInsertDataConverter> converter;
382 
383  if (chunk_cd->columnType.is_fixlen_array()) {
384  converter =
385  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
386  } else if (chunk_cd->columnType.is_string()) {
387  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
388  } else if (chunk_cd->columnType.is_geometry()) {
389  // the logical geo column is a string column
390  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
391  } else {
392  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
393  }
394 
395  chunkConverters.push_back(std::move(converter));
396 
397  } else if (chunk_cd->columnType.is_date_in_days()) {
398  /* Q: Why do we need this?
399  A: In variable length updates path we move the chunk content of column
400  without decoding. Since it again passes through DateDaysEncoder
401  the expected value should be in seconds, but here it will be in days.
402  Therefore, using DateChunkConverter chunk values are being scaled to
403  seconds which then ultimately encoded in days in DateDaysEncoder.
404  */
405  std::unique_ptr<ChunkToInsertDataConverter> converter;
406  const size_t physical_size = chunk_cd->columnType.get_size();
407  if (physical_size == 2) {
408  converter =
409  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
410  } else if (physical_size == 4) {
411  converter =
412  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
413  } else {
414  CHECK(false);
415  }
416  chunkConverters.push_back(std::move(converter));
417  } else {
418  std::unique_ptr<ChunkToInsertDataConverter> converter;
419  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
420  int logical_size = logical_type.get_size();
421  int physical_size = chunk_cd->columnType.get_size();
422 
423  if (logical_type.is_string()) {
424  // for dicts -> logical = physical
425  logical_size = physical_size;
426  }
427 
428  if (8 == physical_size) {
429  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
430  num_rows, chunk.get());
431  } else if (4 == physical_size) {
432  if (8 == logical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
434  num_rows, chunk.get());
435  } else {
436  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
437  num_rows, chunk.get());
438  }
439  } else if (2 == chunk_cd->columnType.get_size()) {
440  if (8 == logical_size) {
441  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
442  num_rows, chunk.get());
443  } else if (4 == logical_size) {
444  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
445  num_rows, chunk.get());
446  } else {
447  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
448  num_rows, chunk.get());
449  }
450  } else if (1 == chunk_cd->columnType.get_size()) {
451  if (8 == logical_size) {
452  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
453  num_rows, chunk.get());
454  } else if (4 == logical_size) {
455  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
456  num_rows, chunk.get());
457  } else if (2 == logical_size) {
458  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
459  num_rows, chunk.get());
460  } else {
461  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
462  num_rows, chunk.get());
463  }
464  } else {
465  CHECK(false); // unknown
466  }
467 
468  chunkConverters.push_back(std::move(converter));
469  }
470  }
471  }
472 
473  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
474  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
475 
476  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
477  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
478  deletedChunk->get_column_desc()->tableId,
479  deletedChunk->get_column_desc()->columnId,
480  fragment.fragmentId};
481  updelRoll.dirtyChunkeys.insert(chunkey);
482  bool* deletedChunkBuffer =
483  reinterpret_cast<bool*>(deletedChunk->get_buffer()->getMemoryPtr());
484 
485  auto row_converter = [&sourceDataProvider,
486  &sourceDataConverters,
487  &indexOffFragmentOffsetColumn,
488  &chunkConverters,
489  &deletedChunkBuffer](size_t indexOfRow) -> void {
490  // convert the source data
491  const auto row = sourceDataProvider.getEntryAt(indexOfRow);
492 
493  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
494  if (sourceDataConverters[col]) {
495  const auto& mapd_variant = row[col];
496  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
497  }
498  }
499 
500  auto scalar = checked_get(
501  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
502  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
503 
504  // convert the remaining chunks
505  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
506  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
507  }
508 
509  // now mark the row as deleted
510  deletedChunkBuffer[indexInChunkBuffer] = true;
511  };
512 
513  bool can_go_parallel = num_rows > 20000;
514 
515  if (can_go_parallel) {
516  const size_t num_worker_threads = cpu_threads();
517  std::vector<std::future<void>> worker_threads;
518  for (size_t i = 0,
519  start_entry = 0,
520  stride = (num_rows + num_worker_threads - 1) / num_worker_threads;
521  i < num_worker_threads && start_entry < num_rows;
522  ++i, start_entry += stride) {
523  const auto end_entry = std::min(start_entry + stride, num_rows);
524  worker_threads.push_back(std::async(
525  std::launch::async,
526  [&row_converter](const size_t start, const size_t end) {
527  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
528  row_converter(indexOfRow);
529  }
530  },
531  start_entry,
532  end_entry));
533  }
534 
535  for (auto& child : worker_threads) {
536  child.wait();
537  }
538 
539  } else {
540  for (size_t indexOfRow = 0; indexOfRow < num_rows; indexOfRow++) {
541  row_converter(indexOfRow);
542  }
543  }
544 
546  insert_data.databaseId = catalog->getCurrentDB().dbId;
547  insert_data.tableId = td->tableId;
548 
549  for (size_t i = 0; i < chunkConverters.size(); i++) {
550  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
551  continue;
552  }
553 
554  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
555  if (sourceDataConverters[i]) {
556  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
557  }
558  continue;
559  }
560 
561  insert_data.numRows = num_rows;
562  insertDataNoCheckpoint(insert_data);
563 
564  // update metdata
565  if (!deletedChunk->get_buffer()->hasEncoder) {
566  deletedChunk->init_encoder();
567  }
568  deletedChunk->get_buffer()->encoder->updateStats(static_cast<int64_t>(true), false);
569 
570  auto& shadowDeletedChunkMeta =
571  fragment.shadowChunkMetadataMap[deletedChunk->get_column_desc()->columnId];
572  if (shadowDeletedChunkMeta.numElements >
573  deletedChunk->get_buffer()->encoder->getNumElems()) {
574  // the append will have populated shadow meta data, otherwise use existing num
575  // elements
576  deletedChunk->get_buffer()->encoder->setNumElems(shadowDeletedChunkMeta.numElements);
577  }
578  deletedChunk->get_buffer()->setUpdated();
579 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
bool is_varlen_update
Definition: UpdelRoll.h:67
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
const int8_t const int64_t * num_rows
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:840
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:2914
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:64
virtual size_t count() const =0
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:23
bool is_string() const
Definition: sqltypes.h:450
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
+ Here is the call graph for this function:

◆ updateMetadata()

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 966 of file UpdelStorage.cpp.

References UpdelRoll::chunkMetadata, fragmentInfoMutex_, and UpdelRoll::numTuples.

968  {
969  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
970  if (updel_roll.chunkMetadata.count(key)) {
971  auto& fragmentInfo = *key.second;
972  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
973  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
974  fragmentInfo.setChunkMetadataMap(chunkMetadata);
975  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
976  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
977  // TODO(ppan): When fragment-level compaction is enable, the following code
978  // should suffice. When not (ie. existing code), we'll revert to update
979  // InsertOrderFragmenter::varLenColInfo_
980  /*
981  for (const auto cit : chunkMetadata) {
982  const auto& cd = *catalog->getMetadataForColumn(td->tableId, cit.first);
983  if (cd.columnType.get_size() < 0)
984  fragmentInfo.varLenColInfox[cd.columnId] = cit.second.numBytes;
985  }
986  */
987  }
988 }
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56

◆ vacuum_fixlen_rows()

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1088 of file UpdelStorage.cpp.

References anonymous_namespace{TypedDataAccessors.h}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1091  {
1092  const auto cd = chunk->get_column_desc();
1093  const auto& col_type = cd->columnType;
1094  auto data_buffer = chunk->get_buffer();
1095  auto data_addr = data_buffer->getMemoryPtr();
1096  auto element_size =
1097  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1098  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1099  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1100  size_t nbytes_fix_data_to_keep = 0;
1101  auto nrows_to_vacuum = frag_offsets.size();
1102  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1103  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1104  auto is_last_one = irow == nrows_to_vacuum;
1105  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1106  auto maddr_to_vacuum = data_addr;
1107  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1108  if (nrows_to_keep > 0) {
1109  auto nbytes_to_keep = nrows_to_keep * element_size;
1110  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1111  // move curr fixlen row block toward front
1112  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1113  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1114  nbytes_to_keep);
1115  }
1116  irow_of_blk_to_fill += nrows_to_keep;
1117  nbytes_fix_data_to_keep += nbytes_to_keep;
1118  }
1119  irow_of_blk_to_keep = irow_to_vacuum + 1;
1120  }
1121  return nbytes_fix_data_to_keep;
1122 }
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ vacuum_varlen_rows()

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1124 of file UpdelStorage.cpp.

References Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1127  {
1128  auto data_buffer = chunk->get_buffer();
1129  auto index_buffer = chunk->get_index_buf();
1130  auto data_addr = data_buffer->getMemoryPtr();
1131  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1132  auto index_array = (StringOffsetT*)indices_addr;
1133  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1134  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1135  size_t nbytes_fix_data_to_keep = 0;
1136  size_t nbytes_var_data_to_keep = 0;
1137  auto nrows_to_vacuum = frag_offsets.size();
1138  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1139  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1140  auto is_last_one = irow == nrows_to_vacuum;
1141  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1142  auto maddr_to_vacuum = data_addr;
1143  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1144  if (nrows_to_keep > 0) {
1145  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1146  auto nbytes_to_keep =
1147  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1148  index_array[irow_of_blk_to_keep];
1149  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1150  // move curr varlen row block toward front
1151  memmove(data_addr + ibyte_var_data_to_keep,
1152  data_addr + index_array[irow_of_blk_to_keep],
1153  nbytes_to_keep);
1154 
1155  const auto index_base = index_array[irow_of_blk_to_keep];
1156  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1157  auto& index = index_array[irow_of_blk_to_keep + i];
1158  index = ibyte_var_data_to_keep + (index - index_base);
1159  }
1160  }
1161  nbytes_var_data_to_keep += nbytes_to_keep;
1162  maddr_to_vacuum = indices_addr;
1163 
1164  constexpr static auto index_element_size = sizeof(StringOffsetT);
1165  nbytes_to_keep = nrows_to_keep * index_element_size;
1166  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1167  // move curr fixlen row block toward front
1168  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1169  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1170  nbytes_to_keep);
1171  }
1172  irow_of_blk_to_fill += nrows_to_keep;
1173  nbytes_fix_data_to_keep += nbytes_to_keep;
1174  }
1175  irow_of_blk_to_keep = irow_to_vacuum + 1;
1176  }
1177  return nbytes_var_data_to_keep;
1178 }
int32_t StringOffsetT
Definition: sqltypes.h:877
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ catalog_

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected

◆ chunkKeyPrefix_

std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 180 of file InsertOrderFragmenter.h.

◆ columnMap_

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 182 of file InsertOrderFragmenter.h.

◆ dataMgr_

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 186 of file InsertOrderFragmenter.h.

◆ defaultInsertLevel_

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

◆ fragmenterType_

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

◆ fragmentInfoMutex_

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

Referenced by updateMetadata().

◆ fragmentInfoVec_

std::deque<FragmentInfo> Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 184 of file InsertOrderFragmenter.h.

Referenced by getFragmentInfoFromId().

◆ hasMaterializedRowId_

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

◆ insertMutex_

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

◆ maxChunkSize_

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 195 of file InsertOrderFragmenter.h.

◆ maxFragmentId_

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 194 of file InsertOrderFragmenter.h.

◆ maxFragmentRows_

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 190 of file InsertOrderFragmenter.h.

◆ maxRows_

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 196 of file InsertOrderFragmenter.h.

◆ mutex_access_inmem_states

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 207 of file InsertOrderFragmenter.h.

◆ numTuples_

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 193 of file InsertOrderFragmenter.h.

◆ pageSize_

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 191 of file InsertOrderFragmenter.h.

◆ physicalTableId_

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected

◆ rowIdColId_

int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 205 of file InsertOrderFragmenter.h.

◆ shard_

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 189 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

◆ temp_mutex_

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 230 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

◆ varLenColInfo_

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 206 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: