OmniSciDB  eee9fa949c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL)
 
 ~InsertOrderFragmenter () override
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insertDataStruct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertDataNoCheckpoint (InsertData &insertDataStruct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map) override
 Update chunk stats. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Static Public Member Functions

static void updateColumn (const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insertDataStruct)
 
void replicateData (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< FragmentInfofragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 54 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 86 of file InsertOrderFragmenter.cpp.

86 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1209 of file UpdelStorage.cpp.

References CHECK(), cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), getChunksForAllColumns(), getFragmentInfoFromId(), UpdelRoll::numTuples, Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1214  {
1215  auto& fragment = getFragmentInfoFromId(fragment_id);
1216  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1217  const auto ncol = chunks.size();
1218 
1219  std::vector<int8_t> has_null_per_thread(ncol, 0);
1220  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1221  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1222  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1223  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1224 
1225  // parallel delete columns
1226  std::vector<std::future<void>> threads;
1227  auto nrows_to_vacuum = frag_offsets.size();
1228  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1229  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1230 
1231  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1232  auto chunk = chunks[ci];
1233  const auto cd = chunk->get_column_desc();
1234  const auto& col_type = cd->columnType;
1235  auto data_buffer = chunk->get_buffer();
1236  auto index_buffer = chunk->get_index_buf();
1237  auto data_addr = data_buffer->getMemoryPtr();
1238  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1239  auto index_array = (StringOffsetT*)indices_addr;
1240  bool is_varlen = col_type.is_varlen_indeed();
1241 
1242  auto fixlen_vacuum = [=,
1243  &has_null_per_thread,
1244  &max_double_per_thread,
1245  &min_double_per_thread,
1246  &min_int64t_per_thread,
1247  &max_int64t_per_thread,
1248  &updel_roll,
1249  &frag_offsets,
1250  &fragment] {
1251  size_t nbytes_fix_data_to_keep;
1252  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1253 
1254  data_buffer->encoder->setNumElems(nrows_to_keep);
1255  data_buffer->setSize(nbytes_fix_data_to_keep);
1256  data_buffer->setUpdated();
1257 
1258  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1259 
1260  auto daddr = data_addr;
1261  auto element_size =
1262  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1263  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1264  if (col_type.is_fixlen_array()) {
1265  auto encoder =
1266  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1267  CHECK(encoder);
1268  encoder->updateMetadata((int8_t*)daddr);
1269  } else if (col_type.is_fp()) {
1270  set_chunk_stats(col_type,
1271  data_addr,
1272  has_null_per_thread[ci],
1273  min_double_per_thread[ci],
1274  max_double_per_thread[ci]);
1275  } else {
1276  set_chunk_stats(col_type,
1277  data_addr,
1278  has_null_per_thread[ci],
1279  min_int64t_per_thread[ci],
1280  max_int64t_per_thread[ci]);
1281  }
1282  }
1283  };
1284 
1285  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1286  size_t nbytes_var_data_to_keep;
1287  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1288 
1289  data_buffer->encoder->setNumElems(nrows_to_keep);
1290  data_buffer->setSize(nbytes_var_data_to_keep);
1291  data_buffer->setUpdated();
1292 
1293  index_array[nrows_to_keep] = data_buffer->size();
1294  index_buffer->setSize(sizeof(*index_array) *
1295  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1296  index_buffer->setUpdated();
1297 
1298  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1299  };
1300 
1301  if (is_varlen) {
1302  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1303  } else {
1304  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1305  }
1306  if (threads.size() >= (size_t)cpu_threads()) {
1307  wait_cleanup_threads(threads);
1308  }
1309  }
1310 
1311  wait_cleanup_threads(threads);
1312 
1313  auto key = std::make_pair(td, &fragment);
1314  updel_roll.numTuples[key] = nrows_to_keep;
1315  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1316  auto chunk = chunks[ci];
1317  auto cd = chunk->get_column_desc();
1318  if (!cd->columnType.is_fixlen_array()) {
1320  fragment,
1321  chunk,
1322  has_null_per_thread[ci],
1323  max_double_per_thread[ci],
1324  min_double_per_thread[ci],
1325  max_int64t_per_thread[ci],
1326  min_int64t_per_thread[ci],
1327  cd->columnType,
1328  updel_roll);
1329  }
1330  }
1331 }
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:912
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
CHECK(cgen_state)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
int cpu_threads()
Definition: thread_count.h:25
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 588 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

589  {
590  // also sets the new fragment as the insertBuffer for each column
591 
592  maxFragmentId_++;
593  FragmentInfo newFragmentInfo;
594  newFragmentInfo.fragmentId = maxFragmentId_;
595  newFragmentInfo.shadowNumTuples = 0;
596  newFragmentInfo.setPhysicalNumTuples(0);
597  for (const auto levelSize : dataMgr_->levelSizes_) {
598  newFragmentInfo.deviceIds.push_back(newFragmentInfo.fragmentId % levelSize);
599  }
600  newFragmentInfo.physicalTableId = physicalTableId_;
601  newFragmentInfo.shard = shard_;
602 
603  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
604  colMapIt != columnMap_.end();
605  ++colMapIt) {
606  // colMapIt->second.unpin_buffer();
607  ChunkKey chunkKey = chunkKeyPrefix_;
608  chunkKey.push_back(colMapIt->second.get_column_desc()->columnId);
609  chunkKey.push_back(maxFragmentId_);
610  colMapIt->second.createChunkBuffer(
611  dataMgr_,
612  chunkKey,
613  memoryLevel,
614  newFragmentInfo.deviceIds[static_cast<int>(memoryLevel)],
615  pageSize_);
616  colMapIt->second.init_encoder();
617  }
618 
619  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
620  fragmentInfoVec_.push_back(newFragmentInfo);
621  return &(fragmentInfoVec_.back());
622 }
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< int > levelSizes_
Definition: DataMgr.h:121
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 200 of file InsertOrderFragmenter.cpp.

References catalog_().

200  {
201  // Fix a verified loophole on sharded logical table which is locked using logical
202  // tableId while it's its physical tables that can come here when fragments overflow
203  // during COPY. Locks on a logical table and its physical tables never intersect, which
204  // means potential races. It'll be an overkill to resolve a logical table to physical
205  // tables in MapDHandler, ParseNode or other higher layers where the logical table is
206  // locked with Table Read/Write locks; it's easier to lock the logical table of its
207  // physical tables. A downside of this approach may be loss of parallel execution of
208  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
209  // operation, the loss seems not a big deal.
210  auto chunkKeyPrefix = chunkKeyPrefix_;
211  if (shard_ >= 0) {
212  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
213  }
214 
215  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
216  // SELECT and COPY may enter a deadlock
217  using namespace Lock_Namespace;
218  WriteLock delete_lock = TableLockMgr::getWriteLockForTable(chunkKeyPrefix);
219 
220  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
221 
222  for (const auto fragId : dropFragIds) {
223  for (const auto& col : columnMap_) {
224  int colId = col.first;
225  vector<int> fragPrefix = chunkKeyPrefix_;
226  fragPrefix.push_back(colId);
227  fragPrefix.push_back(fragId);
228  dataMgr_->deleteChunksWithPrefix(fragPrefix);
229  }
230  }
231 }
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3061
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:360
mapd_unique_lock< MutexType > WriteLock
Definition: TableLockMgr.h:47
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 173 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

173  {
174  // not safe to call from outside insertData
175  // b/c depends on insertLock around numTuples_
176 
177  // don't ever drop the only fragment!
178  if (numTuples_ == fragmentInfoVec_.back().getPhysicalNumTuples()) {
179  return;
180  }
181 
182  if (numTuples_ > maxRows) {
183  size_t preNumTuples = numTuples_;
184  vector<int> dropFragIds;
185  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
186  while (numTuples_ > targetRows) {
187  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
188  size_t numFragTuples = fragmentInfoVec_[0].getPhysicalNumTuples();
189  dropFragIds.push_back(fragmentInfoVec_[0].fragmentId);
190  fragmentInfoVec_.pop_front();
191  CHECK_GE(numTuples_, numFragTuples);
192  numTuples_ -= numFragTuples;
193  }
194  deleteFragments(dropFragIds);
195  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
196  << " post: " << numTuples_ << " maxRows: " << maxRows;
197  }
198 }
#define LOG(tag)
Definition: Logger.h:188
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define CHECK_GT(x, y)
Definition: Logger.h:209
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 102 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 88 of file InsertOrderFragmenter.cpp.

References Data_Namespace::DISK_LEVEL, and to_string().

88  {
90  // memory-resident tables won't have anything on disk
91  std::vector<std::pair<ChunkKey, ChunkMetadata>> chunk_metadata;
93 
94  // data comes like this - database_id, table_id, column_id, fragment_id
95  // but lets sort by database_id, table_id, fragment_id, column_id
96 
97  int fragment_subkey_index = 3;
98  std::sort(chunk_metadata.begin(),
99  chunk_metadata.end(),
100  [&](const std::pair<ChunkKey, ChunkMetadata>& pair1,
101  const std::pair<ChunkKey, ChunkMetadata>& pair2) {
102  return pair1.first[3] < pair2.first[3];
103  });
104 
105  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
106  ++chunk_itr) {
107  int cur_column_id = chunk_itr->first[2];
108  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
109 
110  if (fragmentInfoVec_.empty() ||
111  cur_fragment_id != fragmentInfoVec_.back().fragmentId) {
112  maxFragmentId_ = cur_fragment_id;
113  fragmentInfoVec_.emplace_back();
114  fragmentInfoVec_.back().fragmentId = cur_fragment_id;
115  fragmentInfoVec_.back().setPhysicalNumTuples(chunk_itr->second.numElements);
116  numTuples_ += fragmentInfoVec_.back().getPhysicalNumTuples();
117  for (const auto level_size : dataMgr_->levelSizes_) {
118  fragmentInfoVec_.back().deviceIds.push_back(cur_fragment_id % level_size);
119  }
120  fragmentInfoVec_.back().shadowNumTuples =
121  fragmentInfoVec_.back().getPhysicalNumTuples();
122  fragmentInfoVec_.back().physicalTableId = physicalTableId_;
123  fragmentInfoVec_.back().shard = shard_;
124  } else {
125  if (chunk_itr->second.numElements !=
126  fragmentInfoVec_.back().getPhysicalNumTuples()) {
127  throw std::runtime_error(
128  "Inconsistency in num tuples within fragment for table " +
129  std::to_string(physicalTableId_) + ", Column " +
130  std::to_string(cur_column_id) + ". Fragment Tuples: " +
131  std::to_string(fragmentInfoVec_.back().getPhysicalNumTuples()) +
132  ", Chunk Tuples: " + std::to_string(chunk_itr->second.numElements));
133  }
134  }
135  fragmentInfoVec_.back().setChunkMetadata(cur_column_id, chunk_itr->second);
136  }
137  }
138 
139  ssize_t maxFixedColSize = 0;
140 
141  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
142  ssize_t size = colIt->second.get_column_desc()->columnType.get_size();
143  if (size == -1) { // variable length
144  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
145  size = 8; // b/c we use this for string and array indices - gross to have magic
146  // number here
147  }
148  maxFixedColSize = std::max(maxFixedColSize, size);
149  }
150 
151  // this is maximum number of rows assuming everything is fixed length
152  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
153 
154  if (fragmentInfoVec_.size() > 0) {
155  // Now need to get the insert buffers for each column - should be last
156  // fragment
157  int lastFragmentId = fragmentInfoVec_.back().fragmentId;
158  int deviceId =
159  fragmentInfoVec_.back().deviceIds[static_cast<int>(defaultInsertLevel_)];
160  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
161  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
162  insertKey.push_back(colIt->first); // column id
163  insertKey.push_back(lastFragmentId); // fragment id
164  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
165  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
166  if (varLenColInfoIt != varLenColInfo_.end()) {
167  varLenColInfoIt->second = colIt->second.get_buffer()->size();
168  }
169  }
170  }
171 }
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< int > levelSizes_
Definition: DataMgr.h:121
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:336
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 1019 of file UpdelStorage.cpp.

References catalog_, CHECK(), Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

1022  {
1023  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
1024  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1025  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1026  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1027  ++ncol;
1028  if (!cd->isVirtualCol) {
1029  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1030  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1031  ChunkKey chunk_key{
1032  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1033  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1034  &catalog_->getDataMgr(),
1035  chunk_key,
1036  memory_level,
1037  0,
1038  chunk_meta_it->second.numBytes,
1039  chunk_meta_it->second.numElements);
1040  chunks.push_back(chunk);
1041  }
1042  }
1043  }
1044  return chunks;
1045 }
std::vector< int > ChunkKey
Definition: types.h:35
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 101 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

101 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 106 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo & Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected

Definition at line 48 of file UpdelStorage.cpp.

References CHECK(), and fragmentInfoVec_.

Referenced by compactRows(), updateColumn(), and updateColumns().

48  {
49  auto fragment_it = std::find_if(
50  fragmentInfoVec_.begin(), fragmentInfoVec_.end(), [=](const auto& f) -> bool {
51  return f.fragmentId == fragment_id;
52  });
53  CHECK(fragment_it != fragmentInfoVec_.end());
54  return *fragment_it;
55 }
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 624 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

624  {
625  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
626  TableInfo queryInfo;
627  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
628  // right now we don't test predicate, so just return (copy of) all fragments
629  bool fragmentsExist = false;
630  if (fragmentInfoVec_.empty()) {
631  // If we have no fragments add a dummy empty fragment to make the executor
632  // not have separate logic for 0-row tables
633  int maxFragmentId = 0;
634  FragmentInfo emptyFragmentInfo;
635  emptyFragmentInfo.fragmentId = maxFragmentId;
636  emptyFragmentInfo.shadowNumTuples = 0;
637  emptyFragmentInfo.setPhysicalNumTuples(0);
638  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
639  emptyFragmentInfo.physicalTableId = physicalTableId_;
640  emptyFragmentInfo.shard = shard_;
641  queryInfo.fragments.push_back(emptyFragmentInfo);
642  } else {
643  fragmentsExist = true;
644  queryInfo.fragments = fragmentInfoVec_; // makes a copy
645  }
646  readLock.unlock();
647  queryInfo.setPhysicalNumTuples(0);
648  auto partIt = queryInfo.fragments.begin();
649  if (fragmentsExist) {
650  while (partIt != queryInfo.fragments.end()) {
651  if (partIt->getPhysicalNumTuples() == 0) {
652  // this means that a concurrent insert query inserted tuples into a new fragment
653  // but when the query came in we didn't have this fragment. To make sure we don't
654  // mess up the executor we delete this fragment from the metadatamap (fixes
655  // earlier bug found 2015-05-08)
656  partIt = queryInfo.fragments.erase(partIt);
657  } else {
658  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
659  partIt->getPhysicalNumTuples());
660  ++partIt;
661  }
662  }
663  } else {
664  // We added a dummy fragment and know the table is empty
665  queryInfo.setPhysicalNumTuples(0);
666  }
667  return queryInfo;
668 }
std::vector< int > levelSizes_
Definition: DataMgr.h:121

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1048 of file UpdelStorage.cpp.

References CHECK(), cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1049  {
1050  const auto data_buffer = chunk->get_buffer();
1051  const auto data_addr = data_buffer->getMemoryPtr();
1052  const size_t nrows_in_chunk = data_buffer->size();
1053  const size_t ncore = cpu_threads();
1054  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1055  std::vector<std::vector<uint64_t>> deleted_offsets;
1056  deleted_offsets.resize(ncore);
1057  std::vector<std::future<void>> threads;
1058  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1059  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1060  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1061  const auto ithread = rbegin / segsz;
1062  CHECK(ithread < deleted_offsets.size());
1063  deleted_offsets[ithread].reserve(segsz);
1064  for (size_t r = rbegin; r < rend; ++r) {
1065  if (data_addr[r]) {
1066  deleted_offsets[ithread].push_back(r);
1067  }
1068  }
1069  }));
1070  }
1071  wait_cleanup_threads(threads);
1072  std::vector<uint64_t> all_deleted_offsets;
1073  for (size_t i = 0; i < ncore; ++i) {
1074  all_deleted_offsets.insert(
1075  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1076  }
1077  return all_deleted_offsets;
1078 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
CHECK(cgen_state)
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insertDataStruct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 316 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

316  {
317  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
318  try {
319  mapd_unique_lock<mapd_shared_mutex> insertLock(
320  insertMutex_); // prevent two threads from trying to insert into the same table
321  // simultaneously
322 
323  insertDataImpl(insertDataStruct);
324 
325  if (defaultInsertLevel_ ==
326  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
328  chunkKeyPrefix_[0],
329  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
330  }
331  } catch (...) {
332  int32_t tableEpoch =
333  catalog_->getTableEpoch(insertDataStruct.databaseId, insertDataStruct.tableId);
334 
335  // the statement below deletes *this* object!
336  // relying on exception propagation at this stage
337  // until we can sort this out in a cleaner fashion
339  insertDataStruct.databaseId, insertDataStruct.tableId, tableEpoch);
340  throw;
341  }
342 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:412
void insertDataImpl(InsertData &insertDataStruct)
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2257
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2287

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insertDataStruct)
protected

Definition at line 430 of file InsertOrderFragmenter.cpp.

References CHECK(), CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnDescriptors, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::FragmentInfo::fragmentId, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, Fragmenter_Namespace::InsertData::replicate_count, Fragmenter_Namespace::FragmentInfo::shadowChunkMetadataMap, and Fragmenter_Namespace::FragmentInfo::shadowNumTuples.

430  {
431  // populate deleted system column if it should exists, as it will not come from client
432  // Do not add this magical column in the replicate ALTER TABLE ADD route as
433  // it is not needed and will cause issues
434  std::unique_ptr<int8_t[]> data_for_deleted_column;
435  for (const auto& cit : columnMap_) {
436  if (cit.second.get_column_desc()->isDeletedCol &&
437  insertDataStruct.replicate_count == 0) {
438  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
439  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
440  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
441  insertDataStruct.columnIds.push_back(cit.second.get_column_desc()->columnId);
442  insertDataStruct.columnDescriptors[cit.first] = cit.second.get_column_desc();
443  break;
444  }
445  }
446  // MAT we need to add a removal of the empty column we pushed onto here
447  // for upstream safety. Should not be a problem but need to fix.
448 
449  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
450  for (const auto columnId : insertDataStruct.columnIds) {
451  if (columnMap_.end() == columnMap_.find(columnId)) {
452  columnMap_.emplace(
453  columnId, Chunk_NS::Chunk(insertDataStruct.columnDescriptors.at(columnId)));
454  }
455  }
456 
457  // when replicate (add) column(s), this path seems wont work; go separate route...
458  if (insertDataStruct.replicate_count > 0) {
459  replicateData(insertDataStruct);
460  return;
461  }
462 
463  std::unordered_map<int, int> inverseInsertDataColIdMap;
464  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
465  inverseInsertDataColIdMap.insert(
466  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
467  }
468 
469  size_t numRowsLeft = insertDataStruct.numRows;
470  size_t numRowsInserted = 0;
471  vector<DataBlockPtr> dataCopy =
472  insertDataStruct.data; // bc append data will move ptr forward and this violates
473  // constness of InsertData
474  if (numRowsLeft <= 0) {
475  return;
476  }
477 
478  FragmentInfo* currentFragment = 0;
479 
480  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
481  currentFragment = createNewFragment(defaultInsertLevel_);
482  } else {
483  currentFragment = &(fragmentInfoVec_.back());
484  }
485  size_t startFragment = fragmentInfoVec_.size() - 1;
486 
487  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
488  // loop until done inserting all rows
489  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
490  size_t rowsLeftInCurrentFragment =
491  maxFragmentRows_ - currentFragment->shadowNumTuples;
492  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
493  if (rowsLeftInCurrentFragment != 0) {
494  for (auto& varLenColInfoIt : varLenColInfo_) {
495  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
496  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
497  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
498  if (insertIdIt != inverseInsertDataColIdMap.end()) {
499  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
500  numRowsToInsert = std::min(
501  numRowsToInsert,
502  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
503  numRowsToInsert,
504  numRowsInserted,
505  bytesLeft));
506  }
507  }
508  }
509 
510  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
511  currentFragment = createNewFragment(defaultInsertLevel_);
512  if (numRowsInserted == 0) {
513  startFragment++;
514  }
515  rowsLeftInCurrentFragment = maxFragmentRows_;
516  for (auto& varLenColInfoIt : varLenColInfo_) {
517  varLenColInfoIt.second = 0; // reset byte counter
518  }
519  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
520  for (auto& varLenColInfoIt : varLenColInfo_) {
521  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
522  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
523  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
524  if (insertIdIt != inverseInsertDataColIdMap.end()) {
525  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
526  numRowsToInsert = std::min(
527  numRowsToInsert,
528  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
529  numRowsToInsert,
530  numRowsInserted,
531  bytesLeft));
532  }
533  }
534  }
535 
536  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
537  // never be able to insert anything
538 
539  // for each column, append the data in the appropriate insert buffer
540  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
541  int columnId = insertDataStruct.columnIds[i];
542  auto colMapIt = columnMap_.find(columnId);
543  CHECK(colMapIt != columnMap_.end());
544  currentFragment->shadowChunkMetadataMap[columnId] =
545  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
546  auto varLenColInfoIt = varLenColInfo_.find(columnId);
547  if (varLenColInfoIt != varLenColInfo_.end()) {
548  varLenColInfoIt->second = colMapIt->second.get_buffer()->size();
549  }
550  }
551  if (hasMaterializedRowId_) {
552  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
553  currentFragment->shadowNumTuples;
554  int64_t* rowIdData = new int64_t[numRowsToInsert];
555  for (size_t i = 0; i < numRowsToInsert; ++i) {
556  rowIdData[i] = i + startId;
557  }
558  DataBlockPtr rowIdBlock;
559  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(rowIdData);
560  auto colMapIt = columnMap_.find(rowIdColId_);
561  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
562  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
563  delete[] rowIdData;
564  }
565 
566  currentFragment->shadowNumTuples =
567  fragmentInfoVec_.back().getPhysicalNumTuples() + numRowsToInsert;
568  numRowsLeft -= numRowsToInsert;
569  numRowsInserted += numRowsToInsert;
570  }
571  {
572  // Only take the fragment info lock when updating fragment info map. Otherwise,
573  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
574  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
575  // COPY_FROM waits for TableWriteLock
576  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
577  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
578  partIt != fragmentInfoVec_.end();
579  ++partIt) {
580  partIt->setPhysicalNumTuples(partIt->shadowNumTuples);
581  partIt->setChunkMetadataMap(partIt->shadowChunkMetadataMap);
582  }
583  }
584  numTuples_ += insertDataStruct.numRows;
586 }
void dropFragmentsToSize(const size_t maxRows) override
Will truncate table to less than maxRows by dropping fragments.
#define CHECK_GT(x, y)
Definition: Logger.h:209
CHECK(cgen_state)
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::unordered_map< int, size_t > varLenColInfo_
void replicateData(const InsertData &insertDataStruct)
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:140

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insertDataStruct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 344 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

344  {
345  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
346  mapd_unique_lock<mapd_shared_mutex> insertLock(
347  insertMutex_); // prevent two threads from trying to insert into the same table
348  // simultaneously
349  insertDataImpl(insertDataStruct);
350 }
void insertDataImpl(InsertData &insertDataStruct)

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::replicateData ( const InsertData insertDataStruct)
protected

Definition at line 352 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::bypass, CHECK(), Fragmenter_Namespace::InsertData::columnDescriptors, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

352  {
353  size_t numRowsLeft = insertDataStruct.numRows;
354  for (auto& fragmentInfo : fragmentInfoVec_) {
355  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysical();
356  auto numRowsToInsert = fragmentInfo.getPhysicalNumTuples(); // not getNumTuples()
357  size_t numRowsCanBeInserted;
358  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
359  if (insertDataStruct.bypass[i]) {
360  continue;
361  }
362  auto columnId = insertDataStruct.columnIds[i];
363  auto colDesc = insertDataStruct.columnDescriptors.at(columnId);
364  CHECK(columnMap_.find(columnId) != columnMap_.end());
365 
366  ChunkKey chunkKey = chunkKeyPrefix_;
367  chunkKey.push_back(columnId);
368  chunkKey.push_back(fragmentInfo.fragmentId);
369 
370  auto colMapIt = columnMap_.find(columnId);
371  auto& chunk = colMapIt->second;
372  if (chunk.isChunkOnDevice(
373  dataMgr_,
374  chunkKey,
376  fragmentInfo.deviceIds[static_cast<int>(defaultInsertLevel_)])) {
377  dataMgr_->deleteChunksWithPrefix(chunkKey);
378  }
379  chunk.createChunkBuffer(
380  dataMgr_,
381  chunkKey,
383  fragmentInfo.deviceIds[static_cast<int>(defaultInsertLevel_)]);
384  chunk.init_encoder();
385 
386  try {
387  DataBlockPtr dataCopy = insertDataStruct.data[i];
388  auto size = colDesc->columnType.get_size();
389  if (0 > size) {
390  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
391  varLenColInfo_[columnId] = 0;
392  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
393  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
394  } else {
395  numRowsCanBeInserted = maxChunkSize_ / size;
396  }
397 
398  // FIXME: abort a case in which new column is wider than existing columns
399  if (numRowsCanBeInserted < numRowsToInsert) {
400  throw std::runtime_error("new column '" + colDesc->columnName +
401  "' wider than existing columns is not supported");
402  }
403 
404  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
405  {
406  std::unique_lock<std::mutex> lck(*fragmentInfo.mutex_access_inmem_states);
407  fragmentInfo.shadowChunkMetadataMap[columnId] = chunkMetadata;
408  }
409 
410  // update total size of var-len column in (actually the last) fragment
411  if (0 > size) {
412  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
413  varLenColInfo_[columnId] = chunk.get_buffer()->size();
414  }
415  } catch (...) {
416  dataMgr_->deleteChunksWithPrefix(chunkKey);
417  throw;
418  }
419  }
420  numRowsLeft -= numRowsToInsert;
421  }
422  CHECK(0 == numRowsLeft);
423 
424  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
425  for (auto& fragmentInfo : fragmentInfoVec_) {
426  fragmentInfo.setChunkMetadataMap(fragmentInfo.shadowChunkMetadataMap);
427  }
428 }
std::vector< int > ChunkKey
Definition: types.h:35
std::shared_ptr< std::mutex > mutex_access_inmem_states
CHECK(cgen_state)
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:360
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 233 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK(), ChunkMetadata::chunkStats, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), LOG, showChunk(), VLOG, and logger::WARNING.

235  {
241  if (shard_ >= 0) {
242  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
243  }
244 
245  CHECK(cd);
246  const auto column_id = cd->columnId;
247  const auto col_itr = columnMap_.find(column_id);
248  CHECK(col_itr != columnMap_.end());
249 
250  for (auto& fragment : fragmentInfoVec_) {
251  auto stats_itr = stats_map.find(fragment.fragmentId);
252  if (stats_itr != stats_map.end()) {
253  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(column_id);
254  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
255  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
257  column_id,
258  fragment.fragmentId};
259  auto chunk = Chunk_NS::Chunk::getChunk(cd,
260  &catalog_->getDataMgr(),
261  chunk_key,
263  0,
264  chunk_meta_it->second.numBytes,
265  chunk_meta_it->second.numElements);
266  auto buf = chunk->get_buffer();
267  CHECK(buf);
268  auto encoder = buf->encoder.get();
269  if (!encoder) {
270  throw std::runtime_error("No encoder for chunk " + showChunk(chunk_key));
271  }
272 
273  auto chunk_stats = stats_itr->second;
274 
275  ChunkMetadata old_chunk_metadata;
276  encoder->getMetadata(old_chunk_metadata);
277  auto& old_chunk_stats = old_chunk_metadata.chunkStats;
278 
279  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
280  // Use the logical type to display data, since the encoding should be ignored
281  const auto logical_ti = get_logical_type_info(cd->columnType);
282  if (!didResetStats) {
283  VLOG(3) << "Skipping chunk stats reset for " << showChunk(chunk_key);
284  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
285  << DatumToString(chunk_stats.max, logical_ti);
286  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
287  << DatumToString(chunk_stats.min, logical_ti);
288  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
289  continue; // move to next fragment
290  }
291 
292  VLOG(2) << "Resetting chunk stats for " << showChunk(chunk_key);
293  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
294  << DatumToString(chunk_stats.max, logical_ti);
295  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
296  << DatumToString(chunk_stats.min, logical_ti);
297  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
298 
299  // Reset fragment metadata map and set buffer to dirty
300  ChunkMetadata new_metadata;
301  // Run through fillChunkStats to ensure any transformations to the raw metadata
302  // values get applied (e.g. for date in days)
303  encoder->getMetadata(new_metadata);
304  fragment.setChunkMetadata(column_id, new_metadata);
305  fragment.shadowChunkMetadataMap =
306  fragment.getChunkMetadataMap(); // TODO(adb): needed?
307  buf->setDirty();
308  } else {
309  LOG(WARNING) << "No chunk stats update found for fragment " << fragment.fragmentId
310  << ", table " << physicalTableId_ << ", "
311  << ", column " << column_id;
312  }
313  }
314 }
std::vector< int > ChunkKey
Definition: types.h:35
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:193
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
#define LOG(tag)
Definition: Logger.h:188
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
SQLTypeInfo columnType
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const std::string &  tab_name,
const std::string &  col_name,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
static

Definition at line 63 of file UpdelStorage.cpp.

References CHECK(), Catalog_Namespace::Catalog::getMetadataForColumn(), and Catalog_Namespace::Catalog::getMetadataForTable().

Referenced by updateColumn().

71  {
72  const auto td = catalog->getMetadataForTable(tab_name);
73  CHECK(td);
74  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
75  CHECK(cd);
76  td->fragmenter->updateColumn(catalog,
77  td,
78  cd,
79  fragment_id,
80  frag_offsets,
81  rhs_values,
82  rhs_type,
83  memory_level,
84  updel_roll);
85 }
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 610 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, SQLTypeInfoCore< TYPE_FACET_PACK >::get_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfoFromId(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_boolean(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, UpdelRoll::mutex, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, ColumnDescriptor::tableId, TableDescriptor::tableId, anonymous_namespace{TypedDataAccessors.h}::tabulate_metadata(), temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, updateColumnMetadata(), NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

618  {
619  updel_roll.catalog = catalog;
620  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
621  updel_roll.memoryLevel = memory_level;
622 
623  const size_t ncore = cpu_threads();
624  const auto nrow = frag_offsets.size();
625  const auto n_rhs_values = rhs_values.size();
626  if (0 == nrow) {
627  return;
628  }
629  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
630 
631  auto& fragment = getFragmentInfoFromId(fragment_id);
632  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
633  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
634  ChunkKey chunk_key{
635  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
636  auto chunk = Chunk_NS::Chunk::getChunk(cd,
637  &catalog->getDataMgr(),
638  chunk_key,
640  0,
641  chunk_meta_it->second.numBytes,
642  chunk_meta_it->second.numElements);
643 
644  std::vector<int8_t> has_null_per_thread(ncore, 0);
645  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
646  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
647  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
648  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
649 
650  // parallel update elements
651  std::vector<std::future<void>> threads;
652 
653  const auto segsz = (nrow + ncore - 1) / ncore;
654  auto dbuf = chunk->get_buffer();
655  auto dbuf_addr = dbuf->getMemoryPtr();
656  dbuf->setUpdated();
657  {
658  std::lock_guard<std::mutex> lck(updel_roll.mutex);
659  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
660  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
661  }
662 
663  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
664  cd->tableId,
665  cd->columnId,
666  fragment.fragmentId};
667  updel_roll.dirtyChunkeys.insert(chunkey);
668  }
669  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
670  threads.emplace_back(std::async(
671  std::launch::async,
672  [=,
673  &has_null_per_thread,
674  &min_int64t_per_thread,
675  &max_int64t_per_thread,
676  &min_double_per_thread,
677  &max_double_per_thread,
678  &frag_offsets,
679  &rhs_values] {
680  SQLTypeInfo lhs_type = cd->columnType;
681 
682  // !! not sure if this is a undocumented convention or a bug, but for a sharded
683  // table the dictionary id of a encoded string column is not specified by
684  // comp_param in physical table but somehow in logical table :) comp_param in
685  // physical table is always 0, so need to adapt accordingly...
686  auto cdl = (shard_ < 0)
687  ? cd
688  : catalog->getMetadataForColumn(
689  catalog->getLogicalTableId(td->tableId), cd->columnId);
690  CHECK(cdl);
691  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
692  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
693  lhs_type, &decimalOverflowValidator);
694  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
695  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
696  lhs_type, &dateDaysOverflowValidator);
697 
698  StringDictionary* stringDict{nullptr};
699  if (lhs_type.is_string()) {
700  CHECK(kENCODING_DICT == lhs_type.get_compression());
701  auto dictDesc = const_cast<DictDescriptor*>(
702  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
703  CHECK(dictDesc);
704  stringDict = dictDesc->stringDict.get();
705  CHECK(stringDict);
706  }
707 
708  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
709  const auto roffs = frag_offsets[r];
710  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
711  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
712  ScalarTargetValue sv2;
713 
714  // Subtle here is on the two cases of string-to-string assignments, when
715  // upstream passes RHS string as a string index instead of a preferred "real
716  // string".
717  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
718  // index
719  // in this layer, so if upstream passes a str idx here, an
720  // exception is thrown.
721  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
722  // str idx.
723  if (rhs_type.is_string()) {
724  if (const auto vp = boost::get<int64_t>(sv)) {
725  auto dictDesc = const_cast<DictDescriptor*>(
726  catalog->getMetadataForDict(rhs_type.get_comp_param()));
727  if (nullptr == dictDesc) {
728  throw std::runtime_error(
729  "UPDATE does not support cast from string literal to string "
730  "column.");
731  }
732  auto stringDict = dictDesc->stringDict.get();
733  CHECK(stringDict);
734  sv2 = NullableString(stringDict->getString(*vp));
735  sv = &sv2;
736  }
737  }
738 
739  if (const auto vp = boost::get<int64_t>(sv)) {
740  auto v = *vp;
741  if (lhs_type.is_string()) {
742  throw std::runtime_error("UPDATE does not support cast to string.");
743  }
744  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
745  if (lhs_type.is_decimal()) {
746  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
747  int64_t decimal_val;
748  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
749  tabulate_metadata(lhs_type,
750  min_int64t_per_thread[c],
751  max_int64t_per_thread[c],
752  has_null_per_thread[c],
753  (v == inline_int_null_value<int64_t>() &&
754  lhs_type.get_notnull() == false)
755  ? v
756  : decimal_val);
757  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
758  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
759  if (positive_v_and_negative_d || negative_v_and_positive_d) {
760  throw std::runtime_error(
761  "Data conversion overflow on " + std::to_string(v) +
762  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
763  std::to_string(rhs_type.get_scale()) + ") to (" +
764  std::to_string(lhs_type.get_dimension()) + ", " +
765  std::to_string(lhs_type.get_scale()) + ")");
766  }
767  } else if (is_integral(lhs_type)) {
768  if (lhs_type.is_date_in_days()) {
769  // Store meta values in seconds
770  if (lhs_type.get_size() == 2) {
771  nullAwareDateOverflowValidator.validate<int16_t>(v);
772  } else {
773  nullAwareDateOverflowValidator.validate<int32_t>(v);
774  }
775  int64_t days;
776  get_scalar<int64_t>(data_ptr, lhs_type, days);
777  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
778  tabulate_metadata(lhs_type,
779  min_int64t_per_thread[c],
780  max_int64t_per_thread[c],
781  has_null_per_thread[c],
782  (v == inline_int_null_value<int64_t>() &&
783  lhs_type.get_notnull() == false)
784  ? NullSentinelSupplier()(lhs_type, v)
785  : seconds);
786  } else {
787  int64_t target_value;
788  if (rhs_type.is_decimal()) {
789  target_value = round(decimal_to_double(rhs_type, v));
790  } else {
791  target_value = v;
792  }
793  tabulate_metadata(lhs_type,
794  min_int64t_per_thread[c],
795  max_int64t_per_thread[c],
796  has_null_per_thread[c],
797  target_value);
798  }
799  } else {
801  lhs_type,
802  min_double_per_thread[c],
803  max_double_per_thread[c],
804  has_null_per_thread[c],
805  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
806  }
807  } else if (const auto vp = boost::get<double>(sv)) {
808  auto v = *vp;
809  if (lhs_type.is_string()) {
810  throw std::runtime_error("UPDATE does not support cast to string.");
811  }
812  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
813  if (lhs_type.is_integer()) {
814  tabulate_metadata(lhs_type,
815  min_int64t_per_thread[c],
816  max_int64t_per_thread[c],
817  has_null_per_thread[c],
818  int64_t(v));
819  } else if (lhs_type.is_fp()) {
820  tabulate_metadata(lhs_type,
821  min_double_per_thread[c],
822  max_double_per_thread[c],
823  has_null_per_thread[c],
824  double(v));
825  } else {
826  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
827  "LHS with a floating RHS.";
828  }
829  } else if (const auto vp = boost::get<float>(sv)) {
830  auto v = *vp;
831  if (lhs_type.is_string()) {
832  throw std::runtime_error("UPDATE does not support cast to string.");
833  }
834  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
835  if (lhs_type.is_integer()) {
836  tabulate_metadata(lhs_type,
837  min_int64t_per_thread[c],
838  max_int64t_per_thread[c],
839  has_null_per_thread[c],
840  int64_t(v));
841  } else {
842  tabulate_metadata(lhs_type,
843  min_double_per_thread[c],
844  max_double_per_thread[c],
845  has_null_per_thread[c],
846  double(v));
847  }
848  } else if (const auto vp = boost::get<NullableString>(sv)) {
849  const auto s = boost::get<std::string>(vp);
850  const auto sval = s ? *s : std::string("");
851  if (lhs_type.is_string()) {
852  decltype(stringDict->getOrAdd(sval)) sidx;
853  {
854  std::unique_lock<std::mutex> lock(temp_mutex_);
855  sidx = stringDict->getOrAdd(sval);
856  }
857  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
858  tabulate_metadata(lhs_type,
859  min_int64t_per_thread[c],
860  max_int64t_per_thread[c],
861  has_null_per_thread[c],
862  int64_t(sidx));
863  } else if (sval.size() > 0) {
864  auto dval = std::atof(sval.data());
865  if (lhs_type.is_boolean()) {
866  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
867  } else if (lhs_type.is_time()) {
868  throw std::runtime_error(
869  "Date/Time/Timestamp update not supported through translated "
870  "string path.");
871  }
872  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
873  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
874  tabulate_metadata(lhs_type,
875  min_double_per_thread[c],
876  max_double_per_thread[c],
877  has_null_per_thread[c],
878  double(dval));
879  } else {
880  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
881  tabulate_metadata(lhs_type,
882  min_int64t_per_thread[c],
883  max_int64t_per_thread[c],
884  has_null_per_thread[c],
885  int64_t(dval));
886  }
887  } else {
888  put_null(data_ptr, lhs_type, cd->columnName);
889  has_null_per_thread[c] = true;
890  }
891  } else {
892  CHECK(false);
893  }
894  }
895  }));
896  if (threads.size() >= (size_t)cpu_threads()) {
897  wait_cleanup_threads(threads);
898  }
899  }
900  wait_cleanup_threads(threads);
901 
902  // for unit test
904  if (cd->isDeletedCol) {
905  const auto deleted_offsets = getVacuumOffsets(chunk);
906  if (deleted_offsets.size() > 0) {
907  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
908  return;
909  }
910  }
911  }
912  bool has_null_per_chunk{false};
913  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
914  double min_double_per_chunk{std::numeric_limits<double>::max()};
915  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
916  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
917  for (size_t c = 0; c < ncore; ++c) {
918  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
919  max_double_per_chunk =
920  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
921  min_double_per_chunk =
922  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
923  max_int64t_per_chunk =
924  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
925  min_int64t_per_chunk =
926  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
927  }
929  fragment,
930  chunk,
931  has_null_per_chunk,
932  max_double_per_chunk,
933  min_double_per_chunk,
934  max_int64t_per_chunk,
935  min_int64t_per_chunk,
936  cd->columnType,
937  updel_roll);
938 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
std::vector< int > ChunkKey
Definition: types.h:35
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:241
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
int64_t get_epoch_seconds_from_days(const int64_t days)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3061
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1377
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:328
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
bool is_string() const
Definition: sqltypes.h:477
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
bool is_decimal() const
Definition: sqltypes.h:480
int cpu_threads()
Definition: thread_count.h:25
std::string columnName
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 87 of file UpdelStorage.cpp.

References updateColumn().

95  {
96  updateColumn(catalog,
97  td,
98  cd,
99  fragment_id,
100  frag_offsets,
101  std::vector<ScalarTargetValue>(1, rhs_value),
102  rhs_type,
103  memory_level,
104  updel_roll);
105 }
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const bool  null,
const double  dmax,
const double  dmin,
const int64_t  lmax,
const int64_t  lmin,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 940 of file UpdelStorage.cpp.

References UpdelRoll::catalog, UpdelRoll::chunkMetadata, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getMetadataForTable(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, UpdelRoll::mutex, UpdelRoll::numTuples, Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and ColumnDescriptor::tableId.

Referenced by compactRows(), and updateColumn().

949  {
950  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
951  auto key = std::make_pair(td, &fragment);
952  std::lock_guard<std::mutex> lck(updel_roll.mutex);
953  if (0 == updel_roll.chunkMetadata.count(key)) {
954  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
955  }
956  if (0 == updel_roll.numTuples.count(key)) {
957  updel_roll.numTuples[key] = fragment.shadowNumTuples;
958  }
959  auto& chunkMetadata = updel_roll.chunkMetadata[key];
960 
961  auto buffer = chunk->get_buffer();
962  const auto& lhs_type = cd->columnType;
963 
964  auto update_stats = [& encoder = buffer->encoder](auto min, auto max, auto has_null) {
965  static_assert(std::is_same<decltype(min), decltype(max)>::value,
966  "Type mismatch on min/max");
967  if (has_null) {
968  encoder->updateStats(decltype(min)(), true);
969  }
970  if (max < min) {
971  return;
972  }
973  encoder->updateStats(min, false);
974  encoder->updateStats(max, false);
975  };
976 
977  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
978  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
979  } else if (lhs_type.is_fp()) {
980  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
981  } else if (lhs_type.is_decimal()) {
982  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
983  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
984  has_null_per_chunk);
985  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
986  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
987  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
988  }
989  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
990 
991  // removed as @alex suggests. keep it commented in case of any chance to revisit
992  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
993 }
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:480
std::mutex mutex
Definition: UpdelRoll.h:49
bool is_integral(const SQLTypeInfo &t)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 300 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK(), checked_get(), cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, g_enable_experimental_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), Executor::getExecutor(), getFragmentInfoFromId(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, num_rows, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

309  {
310  if (!is_feature_enabled<VarlenUpdates>()) {
311  throw std::runtime_error("varlen UPDATE path not enabled.");
312  }
313 
314  updelRoll.is_varlen_update = true;
315  updelRoll.catalog = catalog;
316  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
317  updelRoll.memoryLevel = memoryLevel;
318 
319  size_t num_entries = sourceDataProvider.getEntryCount();
320  size_t num_rows = sourceDataProvider.getRowCount();
321 
322  if (0 == num_rows) {
323  // bail out early
324  return;
325  }
326 
328 
329  auto& fragment = getFragmentInfoFromId(fragmentId);
330  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
331  get_chunks(catalog, td, fragment, memoryLevel, chunks);
332  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
333  columnDescriptors.size());
334  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
335  std::shared_ptr<Executor> executor;
336 
338  executor = Executor::getExecutor(catalog->getCurrentDB().dbId);
339  }
340 
341  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
342  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
343  auto chunk = chunks[indexOfChunk];
344  const auto chunk_cd = chunk->get_column_desc();
345 
346  if (chunk_cd->isDeletedCol) {
347  deletedChunk = chunk;
348  continue;
349  }
350 
351  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
352  columnDescriptors.end(),
353  [=](const ColumnDescriptor* cd) -> bool {
354  return cd->columnId == chunk_cd->columnId;
355  });
356 
357  if (targetColumnIt != columnDescriptors.end()) {
358  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
359 
360  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
361  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
362 
364  num_rows,
365  *catalog,
366  sourceDataMetaInfo,
367  targetDescriptor,
368  targetDescriptor->columnType,
369  !targetDescriptor->columnType.get_notnull(),
370  sourceDataProvider.getLiteralDictionary(),
372  ? executor->getStringDictionaryProxy(
373  sourceDataMetaInfo.get_type_info().get_comp_param(),
374  executor->getRowSetMemoryOwner(),
375  true)
376  : nullptr};
377  auto converter = factory.create(param);
378  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
379 
380  if (targetDescriptor->columnType.is_geometry()) {
381  // geometry columns are composites
382  // need to skip chunks, depending on geo type
383  switch (targetDescriptor->columnType.get_type()) {
384  case kMULTIPOLYGON:
385  indexOfChunk += 5;
386  break;
387  case kPOLYGON:
388  indexOfChunk += 4;
389  break;
390  case kLINESTRING:
391  indexOfChunk += 2;
392  break;
393  case kPOINT:
394  indexOfChunk += 1;
395  break;
396  default:
397  CHECK(false); // not supported
398  }
399  }
400  } else {
401  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
402  std::unique_ptr<ChunkToInsertDataConverter> converter;
403 
404  if (chunk_cd->columnType.is_fixlen_array()) {
405  converter =
406  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
407  } else if (chunk_cd->columnType.is_string()) {
408  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
409  } else if (chunk_cd->columnType.is_geometry()) {
410  // the logical geo column is a string column
411  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
412  } else {
413  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
414  }
415 
416  chunkConverters.push_back(std::move(converter));
417 
418  } else if (chunk_cd->columnType.is_date_in_days()) {
419  /* Q: Why do we need this?
420  A: In variable length updates path we move the chunk content of column
421  without decoding. Since it again passes through DateDaysEncoder
422  the expected value should be in seconds, but here it will be in days.
423  Therefore, using DateChunkConverter chunk values are being scaled to
424  seconds which then ultimately encoded in days in DateDaysEncoder.
425  */
426  std::unique_ptr<ChunkToInsertDataConverter> converter;
427  const size_t physical_size = chunk_cd->columnType.get_size();
428  if (physical_size == 2) {
429  converter =
430  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
431  } else if (physical_size == 4) {
432  converter =
433  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
434  } else {
435  CHECK(false);
436  }
437  chunkConverters.push_back(std::move(converter));
438  } else {
439  std::unique_ptr<ChunkToInsertDataConverter> converter;
440  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
441  int logical_size = logical_type.get_size();
442  int physical_size = chunk_cd->columnType.get_size();
443 
444  if (logical_type.is_string()) {
445  // for dicts -> logical = physical
446  logical_size = physical_size;
447  }
448 
449  if (8 == physical_size) {
450  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
451  num_rows, chunk.get());
452  } else if (4 == physical_size) {
453  if (8 == logical_size) {
454  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
455  num_rows, chunk.get());
456  } else {
457  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
458  num_rows, chunk.get());
459  }
460  } else if (2 == chunk_cd->columnType.get_size()) {
461  if (8 == logical_size) {
462  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
463  num_rows, chunk.get());
464  } else if (4 == logical_size) {
465  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
466  num_rows, chunk.get());
467  } else {
468  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
469  num_rows, chunk.get());
470  }
471  } else if (1 == chunk_cd->columnType.get_size()) {
472  if (8 == logical_size) {
473  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
474  num_rows, chunk.get());
475  } else if (4 == logical_size) {
476  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
477  num_rows, chunk.get());
478  } else if (2 == logical_size) {
479  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
480  num_rows, chunk.get());
481  } else {
482  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
483  num_rows, chunk.get());
484  }
485  } else {
486  CHECK(false); // unknown
487  }
488 
489  chunkConverters.push_back(std::move(converter));
490  }
491  }
492  }
493 
494  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
495  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
496 
497  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
498  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
499  deletedChunk->get_column_desc()->tableId,
500  deletedChunk->get_column_desc()->columnId,
501  fragment.fragmentId};
502  updelRoll.dirtyChunkeys.insert(chunkey);
503  bool* deletedChunkBuffer =
504  reinterpret_cast<bool*>(deletedChunk->get_buffer()->getMemoryPtr());
505 
506  std::atomic<size_t> row_idx{0};
507 
508  auto row_converter = [&sourceDataProvider,
509  &sourceDataConverters,
510  &indexOffFragmentOffsetColumn,
511  &chunkConverters,
512  &deletedChunkBuffer,
513  &row_idx](size_t indexOfEntry) -> void {
514  // convert the source data
515  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
516  if (row.empty()) {
517  return;
518  }
519 
520  size_t indexOfRow = row_idx.fetch_add(1);
521 
522  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
523  if (sourceDataConverters[col]) {
524  const auto& mapd_variant = row[col];
525  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
526  }
527  }
528 
529  auto scalar = checked_get(
530  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
531  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
532 
533  // convert the remaining chunks
534  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
535  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
536  }
537 
538  // now mark the row as deleted
539  deletedChunkBuffer[indexInChunkBuffer] = true;
540  };
541 
542  bool can_go_parallel = num_rows > 20000;
543 
544  if (can_go_parallel) {
545  const size_t num_worker_threads = cpu_threads();
546  std::vector<std::future<void>> worker_threads;
547  for (size_t i = 0,
548  start_entry = 0,
549  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
550  i < num_worker_threads && start_entry < num_entries;
551  ++i, start_entry += stride) {
552  const auto end_entry = std::min(start_entry + stride, num_rows);
553  worker_threads.push_back(std::async(
554  std::launch::async,
555  [&row_converter](const size_t start, const size_t end) {
556  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
557  row_converter(indexOfRow);
558  }
559  },
560  start_entry,
561  end_entry));
562  }
563 
564  for (auto& child : worker_threads) {
565  child.wait();
566  }
567 
568  } else {
569  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
570  row_converter(entryIdx);
571  }
572  }
573 
575  insert_data.databaseId = catalog->getCurrentDB().dbId;
576  insert_data.tableId = td->tableId;
577 
578  for (size_t i = 0; i < chunkConverters.size(); i++) {
579  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
580  continue;
581  }
582 
583  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
584  if (sourceDataConverters[i]) {
585  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
586  }
587  continue;
588  }
589 
590  insert_data.numRows = num_rows;
591  insertDataNoCheckpoint(insert_data);
592 
593  // update metdata
594  if (!deletedChunk->get_buffer()->has_encoder) {
595  deletedChunk->init_encoder();
596  }
597  deletedChunk->get_buffer()->encoder->updateStats(static_cast<int64_t>(true), false);
598 
599  auto& shadowDeletedChunkMeta =
600  fragment.shadowChunkMetadataMap[deletedChunk->get_column_desc()->columnId];
601  if (shadowDeletedChunkMeta.numElements >
602  deletedChunk->get_buffer()->encoder->getNumElems()) {
603  // the append will have populated shadow meta data, otherwise use existing num
604  // elements
605  deletedChunk->get_buffer()->encoder->setNumElems(shadowDeletedChunkMeta.numElements);
606  }
607  deletedChunk->get_buffer()->setUpdated();
608 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
bool is_varlen_update
Definition: UpdelRoll.h:67
std::vector< int > ChunkKey
Definition: types.h:35
const int8_t const int64_t * num_rows
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters())
Definition: Execute.cpp:127
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3061
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
bool g_enable_experimental_string_functions
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:64
bool is_string() const
Definition: sqltypes.h:477
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:25
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 995 of file UpdelStorage.cpp.

References UpdelRoll::chunkMetadata, fragmentInfoMutex_, and UpdelRoll::numTuples.

997  {
998  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
999  if (updel_roll.chunkMetadata.count(key)) {
1000  auto& fragmentInfo = *key.second;
1001  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
1002  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
1003  fragmentInfo.setChunkMetadataMap(chunkMetadata);
1004  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
1005  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
1006  // TODO(ppan): When fragment-level compaction is enable, the following code
1007  // should suffice. When not (ie. existing code), we'll revert to update
1008  // InsertOrderFragmenter::varLenColInfo_
1009  /*
1010  for (const auto cit : chunkMetadata) {
1011  const auto& cd = *catalog->getMetadataForColumn(td->tableId, cit.first);
1012  if (cd.columnType.get_size() < 0)
1013  fragmentInfo.varLenColInfox[cd.columnId] = cit.second.numBytes;
1014  }
1015  */
1016  }
1017 }
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1117 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1120  {
1121  const auto cd = chunk->get_column_desc();
1122  const auto& col_type = cd->columnType;
1123  auto data_buffer = chunk->get_buffer();
1124  auto data_addr = data_buffer->getMemoryPtr();
1125  auto element_size =
1126  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1127  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1128  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1129  size_t nbytes_fix_data_to_keep = 0;
1130  auto nrows_to_vacuum = frag_offsets.size();
1131  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1132  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1133  auto is_last_one = irow == nrows_to_vacuum;
1134  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1135  auto maddr_to_vacuum = data_addr;
1136  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1137  if (nrows_to_keep > 0) {
1138  auto nbytes_to_keep = nrows_to_keep * element_size;
1139  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1140  // move curr fixlen row block toward front
1141  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1142  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1143  nbytes_to_keep);
1144  }
1145  irow_of_blk_to_fill += nrows_to_keep;
1146  nbytes_fix_data_to_keep += nbytes_to_keep;
1147  }
1148  irow_of_blk_to_keep = irow_to_vacuum + 1;
1149  }
1150  return nbytes_fix_data_to_keep;
1151 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1153 of file UpdelStorage.cpp.

References Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1156  {
1157  auto data_buffer = chunk->get_buffer();
1158  auto index_buffer = chunk->get_index_buf();
1159  auto data_addr = data_buffer->getMemoryPtr();
1160  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1161  auto index_array = (StringOffsetT*)indices_addr;
1162  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1163  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1164  size_t nbytes_fix_data_to_keep = 0;
1165  size_t nbytes_var_data_to_keep = 0;
1166  auto nrows_to_vacuum = frag_offsets.size();
1167  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1168  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1169  auto is_last_one = irow == nrows_to_vacuum;
1170  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1171  auto maddr_to_vacuum = data_addr;
1172  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1173  if (nrows_to_keep > 0) {
1174  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1175  auto nbytes_to_keep =
1176  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1177  index_array[irow_of_blk_to_keep];
1178  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1179  // move curr varlen row block toward front
1180  memmove(data_addr + ibyte_var_data_to_keep,
1181  data_addr + index_array[irow_of_blk_to_keep],
1182  nbytes_to_keep);
1183 
1184  const auto index_base = index_array[irow_of_blk_to_keep];
1185  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1186  auto& index = index_array[irow_of_blk_to_keep + i];
1187  index = ibyte_var_data_to_keep + (index - index_base);
1188  }
1189  }
1190  nbytes_var_data_to_keep += nbytes_to_keep;
1191  maddr_to_vacuum = indices_addr;
1192 
1193  constexpr static auto index_element_size = sizeof(StringOffsetT);
1194  nbytes_to_keep = nrows_to_keep * index_element_size;
1195  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1196  // move curr fixlen row block toward front
1197  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1198  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1199  nbytes_to_keep);
1200  }
1201  irow_of_blk_to_fill += nrows_to_keep;
1202  nbytes_fix_data_to_keep += nbytes_to_keep;
1203  }
1204  irow_of_blk_to_keep = irow_to_vacuum + 1;
1205  }
1206  return nbytes_var_data_to_keep;
1207 }
int32_t StringOffsetT
Definition: sqltypes.h:912

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 180 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 182 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 186 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

Referenced by updateMetadata().

std::deque<FragmentInfo> Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 184 of file InsertOrderFragmenter.h.

Referenced by getFragmentInfoFromId().

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 195 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 194 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 190 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 196 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 207 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 193 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 191 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 205 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 189 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 230 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 206 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: