OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL)
 
 ~InsertOrderFragmenter () override
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insertDataStruct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertDataNoCheckpoint (InsertData &insertDataStruct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map) override
 Update chunk stats. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Static Public Member Functions

static void updateColumn (const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insertDataStruct)
 
void replicateData (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< FragmentInfofragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 54 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 86 of file InsertOrderFragmenter.cpp.

86 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1205 of file UpdelStorage.cpp.

References CHECK(), cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), getChunksForAllColumns(), getFragmentInfoFromId(), UpdelRoll::numTuples, Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1210  {
1211  auto& fragment = getFragmentInfoFromId(fragment_id);
1212  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1213  const auto ncol = chunks.size();
1214 
1215  std::vector<int8_t> has_null_per_thread(ncol, 0);
1216  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1217  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1218  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1219  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1220 
1221  // parallel delete columns
1222  std::vector<std::future<void>> threads;
1223  auto nrows_to_vacuum = frag_offsets.size();
1224  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1225  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1226 
1227  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1228  auto chunk = chunks[ci];
1229  const auto cd = chunk->get_column_desc();
1230  const auto& col_type = cd->columnType;
1231  auto data_buffer = chunk->get_buffer();
1232  auto index_buffer = chunk->get_index_buf();
1233  auto data_addr = data_buffer->getMemoryPtr();
1234  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1235  auto index_array = (StringOffsetT*)indices_addr;
1236  bool is_varlen = col_type.is_varlen_indeed();
1237 
1238  auto fixlen_vacuum = [=,
1239  &has_null_per_thread,
1240  &max_double_per_thread,
1241  &min_double_per_thread,
1242  &min_int64t_per_thread,
1243  &max_int64t_per_thread,
1244  &updel_roll,
1245  &frag_offsets,
1246  &fragment] {
1247  size_t nbytes_fix_data_to_keep;
1248  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1249 
1250  data_buffer->encoder->setNumElems(nrows_to_keep);
1251  data_buffer->setSize(nbytes_fix_data_to_keep);
1252  data_buffer->setUpdated();
1253 
1254  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1255 
1256  auto daddr = data_addr;
1257  auto element_size =
1258  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1259  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1260  if (col_type.is_fixlen_array()) {
1261  auto encoder =
1262  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1263  CHECK(encoder);
1264  encoder->updateMetadata((int8_t*)daddr);
1265  } else if (col_type.is_fp()) {
1266  set_chunk_stats(col_type,
1267  data_addr,
1268  has_null_per_thread[ci],
1269  min_double_per_thread[ci],
1270  max_double_per_thread[ci]);
1271  } else {
1272  set_chunk_stats(col_type,
1273  data_addr,
1274  has_null_per_thread[ci],
1275  min_int64t_per_thread[ci],
1276  max_int64t_per_thread[ci]);
1277  }
1278  }
1279  };
1280 
1281  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1282  size_t nbytes_var_data_to_keep;
1283  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1284 
1285  data_buffer->encoder->setNumElems(nrows_to_keep);
1286  data_buffer->setSize(nbytes_var_data_to_keep);
1287  data_buffer->setUpdated();
1288 
1289  index_array[nrows_to_keep] = data_buffer->size();
1290  index_buffer->setSize(sizeof(*index_array) *
1291  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1292  index_buffer->setUpdated();
1293 
1294  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1295  };
1296 
1297  if (is_varlen) {
1298  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1299  } else {
1300  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1301  }
1302  if (threads.size() >= (size_t)cpu_threads()) {
1303  wait_cleanup_threads(threads);
1304  }
1305  }
1306 
1307  wait_cleanup_threads(threads);
1308 
1309  auto key = std::make_pair(td, &fragment);
1310  updel_roll.numTuples[key] = nrows_to_keep;
1311  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1312  auto chunk = chunks[ci];
1313  auto cd = chunk->get_column_desc();
1314  if (!cd->columnType.is_fixlen_array()) {
1316  fragment,
1317  chunk,
1318  has_null_per_thread[ci],
1319  max_double_per_thread[ci],
1320  min_double_per_thread[ci],
1321  max_int64t_per_thread[ci],
1322  min_int64t_per_thread[ci],
1323  cd->columnType,
1324  updel_roll);
1325  }
1326  }
1327 }
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:912
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
CHECK(cgen_state)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
int cpu_threads()
Definition: thread_count.h:25
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 588 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

589  {
590  // also sets the new fragment as the insertBuffer for each column
591 
592  maxFragmentId_++;
593  FragmentInfo newFragmentInfo;
594  newFragmentInfo.fragmentId = maxFragmentId_;
595  newFragmentInfo.shadowNumTuples = 0;
596  newFragmentInfo.setPhysicalNumTuples(0);
597  for (const auto levelSize : dataMgr_->levelSizes_) {
598  newFragmentInfo.deviceIds.push_back(newFragmentInfo.fragmentId % levelSize);
599  }
600  newFragmentInfo.physicalTableId = physicalTableId_;
601  newFragmentInfo.shard = shard_;
602 
603  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
604  colMapIt != columnMap_.end();
605  ++colMapIt) {
606  // colMapIt->second.unpin_buffer();
607  ChunkKey chunkKey = chunkKeyPrefix_;
608  chunkKey.push_back(colMapIt->second.get_column_desc()->columnId);
609  chunkKey.push_back(maxFragmentId_);
610  colMapIt->second.createChunkBuffer(
611  dataMgr_,
612  chunkKey,
613  memoryLevel,
614  newFragmentInfo.deviceIds[static_cast<int>(memoryLevel)],
615  pageSize_);
616  colMapIt->second.init_encoder();
617  }
618 
619  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
620  fragmentInfoVec_.push_back(newFragmentInfo);
621  return &(fragmentInfoVec_.back());
622 }
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< int > levelSizes_
Definition: DataMgr.h:121
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 200 of file InsertOrderFragmenter.cpp.

References catalog_().

200  {
201  // Fix a verified loophole on sharded logical table which is locked using logical
202  // tableId while it's its physical tables that can come here when fragments overflow
203  // during COPY. Locks on a logical table and its physical tables never intersect, which
204  // means potential races. It'll be an overkill to resolve a logical table to physical
205  // tables in MapDHandler, ParseNode or other higher layers where the logical table is
206  // locked with Table Read/Write locks; it's easier to lock the logical table of its
207  // physical tables. A downside of this approach may be loss of parallel execution of
208  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
209  // operation, the loss seems not a big deal.
210  auto chunkKeyPrefix = chunkKeyPrefix_;
211  if (shard_ >= 0) {
212  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
213  }
214 
215  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
216  // SELECT and COPY may enter a deadlock
217  using namespace Lock_Namespace;
218  WriteLock delete_lock = TableLockMgr::getWriteLockForTable(chunkKeyPrefix);
219 
220  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
221 
222  for (const auto fragId : dropFragIds) {
223  for (const auto& col : columnMap_) {
224  int colId = col.first;
225  vector<int> fragPrefix = chunkKeyPrefix_;
226  fragPrefix.push_back(colId);
227  fragPrefix.push_back(fragId);
228  dataMgr_->deleteChunksWithPrefix(fragPrefix);
229  }
230  }
231 }
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:2916
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:354
mapd_unique_lock< MutexType > WriteLock
Definition: TableLockMgr.h:47
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 173 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

173  {
174  // not safe to call from outside insertData
175  // b/c depends on insertLock around numTuples_
176 
177  // don't ever drop the only fragment!
178  if (numTuples_ == fragmentInfoVec_.back().getPhysicalNumTuples()) {
179  return;
180  }
181 
182  if (numTuples_ > maxRows) {
183  size_t preNumTuples = numTuples_;
184  vector<int> dropFragIds;
185  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
186  while (numTuples_ > targetRows) {
187  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
188  size_t numFragTuples = fragmentInfoVec_[0].getPhysicalNumTuples();
189  dropFragIds.push_back(fragmentInfoVec_[0].fragmentId);
190  fragmentInfoVec_.pop_front();
191  CHECK_GE(numTuples_, numFragTuples);
192  numTuples_ -= numFragTuples;
193  }
194  deleteFragments(dropFragIds);
195  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
196  << " post: " << numTuples_ << " maxRows: " << maxRows;
197  }
198 }
#define LOG(tag)
Definition: Logger.h:185
#define CHECK_GE(x, y)
Definition: Logger.h:203
#define CHECK_GT(x, y)
Definition: Logger.h:202
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 102 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 88 of file InsertOrderFragmenter.cpp.

References Data_Namespace::DISK_LEVEL, and to_string().

88  {
90  // memory-resident tables won't have anything on disk
91  std::vector<std::pair<ChunkKey, ChunkMetadata>> chunk_metadata;
93 
94  // data comes like this - database_id, table_id, column_id, fragment_id
95  // but lets sort by database_id, table_id, fragment_id, column_id
96 
97  int fragment_subkey_index = 3;
98  std::sort(chunk_metadata.begin(),
99  chunk_metadata.end(),
100  [&](const std::pair<ChunkKey, ChunkMetadata>& pair1,
101  const std::pair<ChunkKey, ChunkMetadata>& pair2) {
102  return pair1.first[3] < pair2.first[3];
103  });
104 
105  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
106  ++chunk_itr) {
107  int cur_column_id = chunk_itr->first[2];
108  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
109 
110  if (fragmentInfoVec_.empty() ||
111  cur_fragment_id != fragmentInfoVec_.back().fragmentId) {
112  maxFragmentId_ = cur_fragment_id;
113  fragmentInfoVec_.emplace_back();
114  fragmentInfoVec_.back().fragmentId = cur_fragment_id;
115  fragmentInfoVec_.back().setPhysicalNumTuples(chunk_itr->second.numElements);
116  numTuples_ += fragmentInfoVec_.back().getPhysicalNumTuples();
117  for (const auto level_size : dataMgr_->levelSizes_) {
118  fragmentInfoVec_.back().deviceIds.push_back(cur_fragment_id % level_size);
119  }
120  fragmentInfoVec_.back().shadowNumTuples =
121  fragmentInfoVec_.back().getPhysicalNumTuples();
122  fragmentInfoVec_.back().physicalTableId = physicalTableId_;
123  fragmentInfoVec_.back().shard = shard_;
124  } else {
125  if (chunk_itr->second.numElements !=
126  fragmentInfoVec_.back().getPhysicalNumTuples()) {
127  throw std::runtime_error(
128  "Inconsistency in num tuples within fragment for table " +
129  std::to_string(physicalTableId_) + ", Column " +
130  std::to_string(cur_column_id) + ". Fragment Tuples: " +
131  std::to_string(fragmentInfoVec_.back().getPhysicalNumTuples()) +
132  ", Chunk Tuples: " + std::to_string(chunk_itr->second.numElements));
133  }
134  }
135  fragmentInfoVec_.back().setChunkMetadata(cur_column_id, chunk_itr->second);
136  }
137  }
138 
139  ssize_t maxFixedColSize = 0;
140 
141  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
142  ssize_t size = colIt->second.get_column_desc()->columnType.get_size();
143  if (size == -1) { // variable length
144  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
145  size = 8; // b/c we use this for string and array indices - gross to have magic
146  // number here
147  }
148  maxFixedColSize = std::max(maxFixedColSize, size);
149  }
150 
151  // this is maximum number of rows assuming everything is fixed length
152  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
153 
154  if (fragmentInfoVec_.size() > 0) {
155  // Now need to get the insert buffers for each column - should be last
156  // fragment
157  int lastFragmentId = fragmentInfoVec_.back().fragmentId;
158  int deviceId =
159  fragmentInfoVec_.back().deviceIds[static_cast<int>(defaultInsertLevel_)];
160  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
161  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
162  insertKey.push_back(colIt->first); // column id
163  insertKey.push_back(lastFragmentId); // fragment id
164  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
165  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
166  if (varLenColInfoIt != varLenColInfo_.end()) {
167  varLenColInfoIt->second = colIt->second.get_buffer()->size();
168  }
169  }
170  }
171 }
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< int > levelSizes_
Definition: DataMgr.h:121
std::string to_string(char const *&&v)
void getChunkMetadataVecForKeyPrefix(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:330
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 1015 of file UpdelStorage.cpp.

References catalog_, CHECK(), Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

1018  {
1019  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
1020  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1021  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1022  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1023  ++ncol;
1024  if (!cd->isVirtualCol) {
1025  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1026  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1027  ChunkKey chunk_key{
1028  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1029  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1030  &catalog_->getDataMgr(),
1031  chunk_key,
1032  memory_level,
1033  0,
1034  chunk_meta_it->second.numBytes,
1035  chunk_meta_it->second.numElements);
1036  chunks.push_back(chunk);
1037  }
1038  }
1039  }
1040  return chunks;
1041 }
std::vector< int > ChunkKey
Definition: types.h:35
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 101 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

101 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 106 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo & Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected

Definition at line 48 of file UpdelStorage.cpp.

References CHECK(), and fragmentInfoVec_.

Referenced by compactRows(), updateColumn(), and updateColumns().

48  {
49  auto fragment_it = std::find_if(
50  fragmentInfoVec_.begin(), fragmentInfoVec_.end(), [=](const auto& f) -> bool {
51  return f.fragmentId == fragment_id;
52  });
53  CHECK(fragment_it != fragmentInfoVec_.end());
54  return *fragment_it;
55 }
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 624 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

624  {
625  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
626  TableInfo queryInfo;
627  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
628  // right now we don't test predicate, so just return (copy of) all fragments
629  bool fragmentsExist = false;
630  if (fragmentInfoVec_.empty()) {
631  // If we have no fragments add a dummy empty fragment to make the executor
632  // not have separate logic for 0-row tables
633  int maxFragmentId = 0;
634  FragmentInfo emptyFragmentInfo;
635  emptyFragmentInfo.fragmentId = maxFragmentId;
636  emptyFragmentInfo.shadowNumTuples = 0;
637  emptyFragmentInfo.setPhysicalNumTuples(0);
638  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
639  emptyFragmentInfo.physicalTableId = physicalTableId_;
640  emptyFragmentInfo.shard = shard_;
641  queryInfo.fragments.push_back(emptyFragmentInfo);
642  } else {
643  fragmentsExist = true;
644  queryInfo.fragments = fragmentInfoVec_; // makes a copy
645  }
646  readLock.unlock();
647  queryInfo.setPhysicalNumTuples(0);
648  auto partIt = queryInfo.fragments.begin();
649  if (fragmentsExist) {
650  while (partIt != queryInfo.fragments.end()) {
651  if (partIt->getPhysicalNumTuples() == 0) {
652  // this means that a concurrent insert query inserted tuples into a new fragment
653  // but when the query came in we didn't have this fragment. To make sure we don't
654  // mess up the executor we delete this fragment from the metadatamap (fixes
655  // earlier bug found 2015-05-08)
656  partIt = queryInfo.fragments.erase(partIt);
657  } else {
658  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
659  partIt->getPhysicalNumTuples());
660  ++partIt;
661  }
662  }
663  } else {
664  // We added a dummy fragment and know the table is empty
665  queryInfo.setPhysicalNumTuples(0);
666  }
667  return queryInfo;
668 }
std::vector< int > levelSizes_
Definition: DataMgr.h:121

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1044 of file UpdelStorage.cpp.

References CHECK(), cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1045  {
1046  const auto data_buffer = chunk->get_buffer();
1047  const auto data_addr = data_buffer->getMemoryPtr();
1048  const size_t nrows_in_chunk = data_buffer->size();
1049  const size_t ncore = cpu_threads();
1050  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1051  std::vector<std::vector<uint64_t>> deleted_offsets;
1052  deleted_offsets.resize(ncore);
1053  std::vector<std::future<void>> threads;
1054  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1055  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1056  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1057  const auto ithread = rbegin / segsz;
1058  CHECK(ithread < deleted_offsets.size());
1059  deleted_offsets[ithread].reserve(segsz);
1060  for (size_t r = rbegin; r < rend; ++r) {
1061  if (data_addr[r]) {
1062  deleted_offsets[ithread].push_back(r);
1063  }
1064  }
1065  }));
1066  }
1067  wait_cleanup_threads(threads);
1068  std::vector<uint64_t> all_deleted_offsets;
1069  for (size_t i = 0; i < ncore; ++i) {
1070  all_deleted_offsets.insert(
1071  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1072  }
1073  return all_deleted_offsets;
1074 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
CHECK(cgen_state)
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insertDataStruct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 316 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

316  {
317  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
318  try {
319  mapd_unique_lock<mapd_shared_mutex> insertLock(
320  insertMutex_); // prevent two threads from trying to insert into the same table
321  // simultaneously
322 
323  insertDataImpl(insertDataStruct);
324 
325  if (defaultInsertLevel_ ==
326  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
328  chunkKeyPrefix_[0],
329  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
330  }
331  } catch (...) {
332  int32_t tableEpoch =
333  catalog_->getTableEpoch(insertDataStruct.databaseId, insertDataStruct.tableId);
334 
335  // the statement below deletes *this* object!
336  // relying on exception propagation at this stage
337  // until we can sort this out in a cleaner fashion
339  insertDataStruct.databaseId, insertDataStruct.tableId, tableEpoch);
340  throw;
341  }
342 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:406
void insertDataImpl(InsertData &insertDataStruct)
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2125
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2155

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insertDataStruct)
protected

Definition at line 430 of file InsertOrderFragmenter.cpp.

References CHECK(), CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnDescriptors, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, Fragmenter_Namespace::FragmentInfo::fragmentId, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, Fragmenter_Namespace::InsertData::replicate_count, Fragmenter_Namespace::FragmentInfo::shadowChunkMetadataMap, and Fragmenter_Namespace::FragmentInfo::shadowNumTuples.

430  {
431  // populate deleted system column if it should exists, as it will not come from client
432  // Do not add this magical column in the replicate ALTER TABLE ADD route as
433  // it is not needed and will cause issues
434  std::unique_ptr<int8_t[]> data_for_deleted_column;
435  for (const auto& cit : columnMap_) {
436  if (cit.second.get_column_desc()->isDeletedCol &&
437  insertDataStruct.replicate_count == 0) {
438  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
439  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
440  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
441  insertDataStruct.columnIds.push_back(cit.second.get_column_desc()->columnId);
442  insertDataStruct.columnDescriptors[cit.first] = cit.second.get_column_desc();
443  break;
444  }
445  }
446  // MAT we need to add a removal of the empty column we pushed onto here
447  // for upstream safety. Should not be a problem but need to fix.
448 
449  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
450  for (const auto columnId : insertDataStruct.columnIds) {
451  if (columnMap_.end() == columnMap_.find(columnId)) {
452  columnMap_.emplace(
453  columnId, Chunk_NS::Chunk(insertDataStruct.columnDescriptors.at(columnId)));
454  }
455  }
456 
457  // when replicate (add) column(s), this path seems wont work; go separate route...
458  if (insertDataStruct.replicate_count > 0) {
459  replicateData(insertDataStruct);
460  return;
461  }
462 
463  std::unordered_map<int, int> inverseInsertDataColIdMap;
464  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
465  inverseInsertDataColIdMap.insert(
466  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
467  }
468 
469  size_t numRowsLeft = insertDataStruct.numRows;
470  size_t numRowsInserted = 0;
471  vector<DataBlockPtr> dataCopy =
472  insertDataStruct.data; // bc append data will move ptr forward and this violates
473  // constness of InsertData
474  if (numRowsLeft <= 0) {
475  return;
476  }
477 
478  FragmentInfo* currentFragment = 0;
479 
480  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
481  currentFragment = createNewFragment(defaultInsertLevel_);
482  } else {
483  currentFragment = &(fragmentInfoVec_.back());
484  }
485  size_t startFragment = fragmentInfoVec_.size() - 1;
486 
487  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
488  // loop until done inserting all rows
489  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
490  size_t rowsLeftInCurrentFragment =
491  maxFragmentRows_ - currentFragment->shadowNumTuples;
492  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
493  if (rowsLeftInCurrentFragment != 0) {
494  for (auto& varLenColInfoIt : varLenColInfo_) {
495  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
496  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
497  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
498  if (insertIdIt != inverseInsertDataColIdMap.end()) {
499  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
500  numRowsToInsert = std::min(
501  numRowsToInsert,
502  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
503  numRowsToInsert,
504  numRowsInserted,
505  bytesLeft));
506  }
507  }
508  }
509 
510  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
511  currentFragment = createNewFragment(defaultInsertLevel_);
512  if (numRowsInserted == 0) {
513  startFragment++;
514  }
515  rowsLeftInCurrentFragment = maxFragmentRows_;
516  for (auto& varLenColInfoIt : varLenColInfo_) {
517  varLenColInfoIt.second = 0; // reset byte counter
518  }
519  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
520  for (auto& varLenColInfoIt : varLenColInfo_) {
521  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
522  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
523  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
524  if (insertIdIt != inverseInsertDataColIdMap.end()) {
525  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
526  numRowsToInsert = std::min(
527  numRowsToInsert,
528  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
529  numRowsToInsert,
530  numRowsInserted,
531  bytesLeft));
532  }
533  }
534  }
535 
536  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
537  // never be able to insert anything
538 
539  // for each column, append the data in the appropriate insert buffer
540  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
541  int columnId = insertDataStruct.columnIds[i];
542  auto colMapIt = columnMap_.find(columnId);
543  CHECK(colMapIt != columnMap_.end());
544  currentFragment->shadowChunkMetadataMap[columnId] =
545  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
546  auto varLenColInfoIt = varLenColInfo_.find(columnId);
547  if (varLenColInfoIt != varLenColInfo_.end()) {
548  varLenColInfoIt->second = colMapIt->second.get_buffer()->size();
549  }
550  }
551  if (hasMaterializedRowId_) {
552  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
553  currentFragment->shadowNumTuples;
554  int64_t* rowIdData = new int64_t[numRowsToInsert];
555  for (size_t i = 0; i < numRowsToInsert; ++i) {
556  rowIdData[i] = i + startId;
557  }
558  DataBlockPtr rowIdBlock;
559  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(rowIdData);
560  auto colMapIt = columnMap_.find(rowIdColId_);
561  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
562  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
563  delete[] rowIdData;
564  }
565 
566  currentFragment->shadowNumTuples =
567  fragmentInfoVec_.back().getPhysicalNumTuples() + numRowsToInsert;
568  numRowsLeft -= numRowsToInsert;
569  numRowsInserted += numRowsToInsert;
570  }
571  {
572  // Only take the fragment info lock when updating fragment info map. Otherwise,
573  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
574  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
575  // COPY_FROM waits for TableWriteLock
576  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
577  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
578  partIt != fragmentInfoVec_.end();
579  ++partIt) {
580  partIt->setPhysicalNumTuples(partIt->shadowNumTuples);
581  partIt->setChunkMetadataMap(partIt->shadowChunkMetadataMap);
582  }
583  }
584  numTuples_ += insertDataStruct.numRows;
586 }
void dropFragmentsToSize(const size_t maxRows) override
Will truncate table to less than maxRows by dropping fragments.
#define CHECK_GT(x, y)
Definition: Logger.h:202
CHECK(cgen_state)
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:201
std::unordered_map< int, size_t > varLenColInfo_
void replicateData(const InsertData &insertDataStruct)
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:140

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insertDataStruct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 344 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

344  {
345  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
346  mapd_unique_lock<mapd_shared_mutex> insertLock(
347  insertMutex_); // prevent two threads from trying to insert into the same table
348  // simultaneously
349  insertDataImpl(insertDataStruct);
350 }
void insertDataImpl(InsertData &insertDataStruct)

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::replicateData ( const InsertData insertDataStruct)
protected

Definition at line 352 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::bypass, CHECK(), Fragmenter_Namespace::InsertData::columnDescriptors, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

352  {
353  size_t numRowsLeft = insertDataStruct.numRows;
354  for (auto& fragmentInfo : fragmentInfoVec_) {
355  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysical();
356  auto numRowsToInsert = fragmentInfo.getPhysicalNumTuples(); // not getNumTuples()
357  size_t numRowsCanBeInserted;
358  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
359  if (insertDataStruct.bypass[i]) {
360  continue;
361  }
362  auto columnId = insertDataStruct.columnIds[i];
363  auto colDesc = insertDataStruct.columnDescriptors.at(columnId);
364  CHECK(columnMap_.find(columnId) != columnMap_.end());
365 
366  ChunkKey chunkKey = chunkKeyPrefix_;
367  chunkKey.push_back(columnId);
368  chunkKey.push_back(fragmentInfo.fragmentId);
369 
370  auto colMapIt = columnMap_.find(columnId);
371  auto& chunk = colMapIt->second;
372  if (chunk.isChunkOnDevice(
373  dataMgr_,
374  chunkKey,
376  fragmentInfo.deviceIds[static_cast<int>(defaultInsertLevel_)])) {
377  dataMgr_->deleteChunksWithPrefix(chunkKey);
378  }
379  chunk.createChunkBuffer(
380  dataMgr_,
381  chunkKey,
383  fragmentInfo.deviceIds[static_cast<int>(defaultInsertLevel_)]);
384  chunk.init_encoder();
385 
386  try {
387  DataBlockPtr dataCopy = insertDataStruct.data[i];
388  auto size = colDesc->columnType.get_size();
389  if (0 > size) {
390  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
391  varLenColInfo_[columnId] = 0;
392  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
393  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
394  } else {
395  numRowsCanBeInserted = maxChunkSize_ / size;
396  }
397 
398  // FIXME: abort a case in which new column is wider than existing columns
399  if (numRowsCanBeInserted < numRowsToInsert) {
400  throw std::runtime_error("new column '" + colDesc->columnName +
401  "' wider than existing columns is not supported");
402  }
403 
404  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
405  {
406  std::unique_lock<std::mutex> lck(*fragmentInfo.mutex_access_inmem_states);
407  fragmentInfo.shadowChunkMetadataMap[columnId] = chunkMetadata;
408  }
409 
410  // update total size of var-len column in (actually the last) fragment
411  if (0 > size) {
412  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
413  varLenColInfo_[columnId] = chunk.get_buffer()->size();
414  }
415  } catch (...) {
416  dataMgr_->deleteChunksWithPrefix(chunkKey);
417  throw;
418  }
419  }
420  numRowsLeft -= numRowsToInsert;
421  }
422  CHECK(0 == numRowsLeft);
423 
424  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
425  for (auto& fragmentInfo : fragmentInfoVec_) {
426  fragmentInfo.setChunkMetadataMap(fragmentInfo.shadowChunkMetadataMap);
427  }
428 }
std::vector< int > ChunkKey
Definition: types.h:35
std::shared_ptr< std::mutex > mutex_access_inmem_states
CHECK(cgen_state)
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:354
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 233 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK(), ChunkMetadata::chunkStats, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), LOG, showChunk(), VLOG, and logger::WARNING.

235  {
241  if (shard_ >= 0) {
242  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
243  }
244 
245  CHECK(cd);
246  const auto column_id = cd->columnId;
247  const auto col_itr = columnMap_.find(column_id);
248  CHECK(col_itr != columnMap_.end());
249 
250  for (auto& fragment : fragmentInfoVec_) {
251  auto stats_itr = stats_map.find(fragment.fragmentId);
252  if (stats_itr != stats_map.end()) {
253  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(column_id);
254  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
255  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
257  column_id,
258  fragment.fragmentId};
259  auto chunk = Chunk_NS::Chunk::getChunk(cd,
260  &catalog_->getDataMgr(),
261  chunk_key,
263  0,
264  chunk_meta_it->second.numBytes,
265  chunk_meta_it->second.numElements);
266  auto buf = chunk->get_buffer();
267  CHECK(buf);
268  auto encoder = buf->encoder.get();
269  if (!encoder) {
270  throw std::runtime_error("No encoder for chunk " + showChunk(chunk_key));
271  }
272 
273  auto chunk_stats = stats_itr->second;
274 
275  ChunkMetadata old_chunk_metadata;
276  encoder->getMetadata(old_chunk_metadata);
277  auto& old_chunk_stats = old_chunk_metadata.chunkStats;
278 
279  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
280  // Use the logical type to display data, since the encoding should be ignored
281  const auto logical_ti = get_logical_type_info(cd->columnType);
282  if (!didResetStats) {
283  VLOG(3) << "Skipping chunk stats reset for " << showChunk(chunk_key);
284  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
285  << DatumToString(chunk_stats.max, logical_ti);
286  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
287  << DatumToString(chunk_stats.min, logical_ti);
288  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
289  continue; // move to next fragment
290  }
291 
292  VLOG(2) << "Resetting chunk stats for " << showChunk(chunk_key);
293  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
294  << DatumToString(chunk_stats.max, logical_ti);
295  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
296  << DatumToString(chunk_stats.min, logical_ti);
297  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
298 
299  // Reset fragment metadata map and set buffer to dirty
300  ChunkMetadata new_metadata;
301  // Run through fillChunkStats to ensure any transformations to the raw metadata
302  // values get applied (e.g. for date in days)
303  encoder->getMetadata(new_metadata);
304  fragment.setChunkMetadata(column_id, new_metadata);
305  fragment.shadowChunkMetadataMap =
306  fragment.getChunkMetadataMap(); // TODO(adb): needed?
307  buf->setDirty();
308  } else {
309  LOG(WARNING) << "No chunk stats update found for fragment " << fragment.fragmentId
310  << ", table " << physicalTableId_ << ", "
311  << ", column " << column_id;
312  }
313  }
314 }
std::vector< int > ChunkKey
Definition: types.h:35
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:193
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
#define LOG(tag)
Definition: Logger.h:185
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
SQLTypeInfo columnType
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:280

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const std::string &  tab_name,
const std::string &  col_name,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
static

Definition at line 63 of file UpdelStorage.cpp.

References CHECK(), Catalog_Namespace::Catalog::getMetadataForColumn(), and Catalog_Namespace::Catalog::getMetadataForTable().

Referenced by updateColumn().

71  {
72  const auto td = catalog->getMetadataForTable(tab_name);
73  CHECK(td);
74  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
75  CHECK(cd);
76  td->fragmenter->updateColumn(catalog,
77  td,
78  cd,
79  fragment_id,
80  frag_offsets,
81  rhs_values,
82  rhs_type,
83  memory_level,
84  updel_roll);
85 }
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 606 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, SQLTypeInfoCore< TYPE_FACET_PACK >::get_comp_param(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_compression(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfoFromId(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_boolean(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, UpdelRoll::mutex, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, ColumnDescriptor::tableId, TableDescriptor::tableId, anonymous_namespace{TypedDataAccessors.h}::tabulate_metadata(), temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, updateColumnMetadata(), NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

614  {
615  updel_roll.catalog = catalog;
616  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
617  updel_roll.memoryLevel = memory_level;
618 
619  const size_t ncore = cpu_threads();
620  const auto nrow = frag_offsets.size();
621  const auto n_rhs_values = rhs_values.size();
622  if (0 == nrow) {
623  return;
624  }
625  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
626 
627  auto& fragment = getFragmentInfoFromId(fragment_id);
628  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
629  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
630  ChunkKey chunk_key{
631  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
632  auto chunk = Chunk_NS::Chunk::getChunk(cd,
633  &catalog->getDataMgr(),
634  chunk_key,
636  0,
637  chunk_meta_it->second.numBytes,
638  chunk_meta_it->second.numElements);
639 
640  std::vector<int8_t> has_null_per_thread(ncore, 0);
641  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
642  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
643  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
644  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
645 
646  // parallel update elements
647  std::vector<std::future<void>> threads;
648 
649  const auto segsz = (nrow + ncore - 1) / ncore;
650  auto dbuf = chunk->get_buffer();
651  auto dbuf_addr = dbuf->getMemoryPtr();
652  dbuf->setUpdated();
653  {
654  std::lock_guard<std::mutex> lck(updel_roll.mutex);
655  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
656  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
657  }
658 
659  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
660  cd->tableId,
661  cd->columnId,
662  fragment.fragmentId};
663  updel_roll.dirtyChunkeys.insert(chunkey);
664  }
665  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
666  threads.emplace_back(std::async(
667  std::launch::async,
668  [=,
669  &has_null_per_thread,
670  &min_int64t_per_thread,
671  &max_int64t_per_thread,
672  &min_double_per_thread,
673  &max_double_per_thread,
674  &frag_offsets,
675  &rhs_values] {
676  SQLTypeInfo lhs_type = cd->columnType;
677 
678  // !! not sure if this is a undocumented convention or a bug, but for a sharded
679  // table the dictionary id of a encoded string column is not specified by
680  // comp_param in physical table but somehow in logical table :) comp_param in
681  // physical table is always 0, so need to adapt accordingly...
682  auto cdl = (shard_ < 0)
683  ? cd
684  : catalog->getMetadataForColumn(
685  catalog->getLogicalTableId(td->tableId), cd->columnId);
686  CHECK(cdl);
687  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
688  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
689  lhs_type, &decimalOverflowValidator);
690  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
691  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
692  lhs_type, &dateDaysOverflowValidator);
693 
694  StringDictionary* stringDict{nullptr};
695  if (lhs_type.is_string()) {
696  CHECK(kENCODING_DICT == lhs_type.get_compression());
697  auto dictDesc = const_cast<DictDescriptor*>(
698  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
699  CHECK(dictDesc);
700  stringDict = dictDesc->stringDict.get();
701  CHECK(stringDict);
702  }
703 
704  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
705  const auto roffs = frag_offsets[r];
706  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
707  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
708  ScalarTargetValue sv2;
709 
710  // Subtle here is on the two cases of string-to-string assignments, when
711  // upstream passes RHS string as a string index instead of a preferred "real
712  // string".
713  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
714  // index
715  // in this layer, so if upstream passes a str idx here, an
716  // exception is thrown.
717  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
718  // str idx.
719  if (rhs_type.is_string()) {
720  if (const auto vp = boost::get<int64_t>(sv)) {
721  auto dictDesc = const_cast<DictDescriptor*>(
722  catalog->getMetadataForDict(rhs_type.get_comp_param()));
723  if (nullptr == dictDesc) {
724  throw std::runtime_error(
725  "UPDATE does not support cast from string literal to string "
726  "column.");
727  }
728  auto stringDict = dictDesc->stringDict.get();
729  CHECK(stringDict);
730  sv2 = NullableString(stringDict->getString(*vp));
731  sv = &sv2;
732  }
733  }
734 
735  if (const auto vp = boost::get<int64_t>(sv)) {
736  auto v = *vp;
737  if (lhs_type.is_string()) {
738  throw std::runtime_error("UPDATE does not support cast to string.");
739  }
740  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
741  if (lhs_type.is_decimal()) {
742  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
743  int64_t decimal_val;
744  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
745  tabulate_metadata(lhs_type,
746  min_int64t_per_thread[c],
747  max_int64t_per_thread[c],
748  has_null_per_thread[c],
749  (v == inline_int_null_value<int64_t>() &&
750  lhs_type.get_notnull() == false)
751  ? v
752  : decimal_val);
753  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
754  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
755  if (positive_v_and_negative_d || negative_v_and_positive_d) {
756  throw std::runtime_error(
757  "Data conversion overflow on " + std::to_string(v) +
758  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
759  std::to_string(rhs_type.get_scale()) + ") to (" +
760  std::to_string(lhs_type.get_dimension()) + ", " +
761  std::to_string(lhs_type.get_scale()) + ")");
762  }
763  } else if (is_integral(lhs_type)) {
764  if (lhs_type.is_date_in_days()) {
765  // Store meta values in seconds
766  if (lhs_type.get_size() == 2) {
767  nullAwareDateOverflowValidator.validate<int16_t>(v);
768  } else {
769  nullAwareDateOverflowValidator.validate<int32_t>(v);
770  }
771  int64_t days;
772  get_scalar<int64_t>(data_ptr, lhs_type, days);
773  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
774  tabulate_metadata(lhs_type,
775  min_int64t_per_thread[c],
776  max_int64t_per_thread[c],
777  has_null_per_thread[c],
778  (v == inline_int_null_value<int64_t>() &&
779  lhs_type.get_notnull() == false)
780  ? NullSentinelSupplier()(lhs_type, v)
781  : seconds);
782  } else {
783  int64_t target_value;
784  if (rhs_type.is_decimal()) {
785  target_value = round(decimal_to_double(rhs_type, v));
786  } else {
787  target_value = v;
788  }
789  tabulate_metadata(lhs_type,
790  min_int64t_per_thread[c],
791  max_int64t_per_thread[c],
792  has_null_per_thread[c],
793  target_value);
794  }
795  } else {
797  lhs_type,
798  min_double_per_thread[c],
799  max_double_per_thread[c],
800  has_null_per_thread[c],
801  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
802  }
803  } else if (const auto vp = boost::get<double>(sv)) {
804  auto v = *vp;
805  if (lhs_type.is_string()) {
806  throw std::runtime_error("UPDATE does not support cast to string.");
807  }
808  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
809  if (lhs_type.is_integer()) {
810  tabulate_metadata(lhs_type,
811  min_int64t_per_thread[c],
812  max_int64t_per_thread[c],
813  has_null_per_thread[c],
814  int64_t(v));
815  } else if (lhs_type.is_fp()) {
816  tabulate_metadata(lhs_type,
817  min_double_per_thread[c],
818  max_double_per_thread[c],
819  has_null_per_thread[c],
820  double(v));
821  } else {
822  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
823  "LHS with a floating RHS.";
824  }
825  } else if (const auto vp = boost::get<float>(sv)) {
826  auto v = *vp;
827  if (lhs_type.is_string()) {
828  throw std::runtime_error("UPDATE does not support cast to string.");
829  }
830  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
831  if (lhs_type.is_integer()) {
832  tabulate_metadata(lhs_type,
833  min_int64t_per_thread[c],
834  max_int64t_per_thread[c],
835  has_null_per_thread[c],
836  int64_t(v));
837  } else {
838  tabulate_metadata(lhs_type,
839  min_double_per_thread[c],
840  max_double_per_thread[c],
841  has_null_per_thread[c],
842  double(v));
843  }
844  } else if (const auto vp = boost::get<NullableString>(sv)) {
845  const auto s = boost::get<std::string>(vp);
846  const auto sval = s ? *s : std::string("");
847  if (lhs_type.is_string()) {
848  decltype(stringDict->getOrAdd(sval)) sidx;
849  {
850  std::unique_lock<std::mutex> lock(temp_mutex_);
851  sidx = stringDict->getOrAdd(sval);
852  }
853  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
854  tabulate_metadata(lhs_type,
855  min_int64t_per_thread[c],
856  max_int64t_per_thread[c],
857  has_null_per_thread[c],
858  int64_t(sidx));
859  } else if (sval.size() > 0) {
860  auto dval = std::atof(sval.data());
861  if (lhs_type.is_boolean()) {
862  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
863  } else if (lhs_type.is_time()) {
864  throw std::runtime_error(
865  "Date/Time/Timestamp update not supported through translated "
866  "string path.");
867  }
868  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
869  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
870  tabulate_metadata(lhs_type,
871  min_double_per_thread[c],
872  max_double_per_thread[c],
873  has_null_per_thread[c],
874  double(dval));
875  } else {
876  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
877  tabulate_metadata(lhs_type,
878  min_int64t_per_thread[c],
879  max_int64t_per_thread[c],
880  has_null_per_thread[c],
881  int64_t(dval));
882  }
883  } else {
884  put_null(data_ptr, lhs_type, cd->columnName);
885  has_null_per_thread[c] = true;
886  }
887  } else {
888  CHECK(false);
889  }
890  }
891  }));
892  if (threads.size() >= (size_t)cpu_threads()) {
893  wait_cleanup_threads(threads);
894  }
895  }
896  wait_cleanup_threads(threads);
897 
898  // for unit test
900  if (cd->isDeletedCol) {
901  const auto deleted_offsets = getVacuumOffsets(chunk);
902  if (deleted_offsets.size() > 0) {
903  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
904  return;
905  }
906  }
907  }
908  bool has_null_per_chunk{false};
909  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
910  double min_double_per_chunk{std::numeric_limits<double>::max()};
911  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
912  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
913  for (size_t c = 0; c < ncore; ++c) {
914  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
915  max_double_per_chunk =
916  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
917  min_double_per_chunk =
918  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
919  max_int64t_per_chunk =
920  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
921  min_int64t_per_chunk =
922  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
923  }
925  fragment,
926  chunk,
927  has_null_per_chunk,
928  max_double_per_chunk,
929  min_double_per_chunk,
930  max_int64t_per_chunk,
931  min_int64t_per_chunk,
932  cd->columnType,
933  updel_roll);
934 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
std::vector< int > ChunkKey
Definition: types.h:35
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:234
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
int64_t get_epoch_seconds_from_days(const int64_t days)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:2916
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1350
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:328
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
bool is_string() const
Definition: sqltypes.h:477
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
bool is_decimal() const
Definition: sqltypes.h:480
int cpu_threads()
Definition: thread_count.h:25
std::string columnName
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 87 of file UpdelStorage.cpp.

References updateColumn().

95  {
96  updateColumn(catalog,
97  td,
98  cd,
99  fragment_id,
100  frag_offsets,
101  std::vector<ScalarTargetValue>(1, rhs_value),
102  rhs_type,
103  memory_level,
104  updel_roll);
105 }
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const bool  null,
const double  dmax,
const double  dmin,
const int64_t  lmax,
const int64_t  lmin,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 936 of file UpdelStorage.cpp.

References UpdelRoll::catalog, UpdelRoll::chunkMetadata, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getMetadataForTable(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, UpdelRoll::mutex, UpdelRoll::numTuples, Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and ColumnDescriptor::tableId.

Referenced by compactRows(), and updateColumn().

945  {
946  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
947  auto key = std::make_pair(td, &fragment);
948  std::lock_guard<std::mutex> lck(updel_roll.mutex);
949  if (0 == updel_roll.chunkMetadata.count(key)) {
950  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
951  }
952  if (0 == updel_roll.numTuples.count(key)) {
953  updel_roll.numTuples[key] = fragment.shadowNumTuples;
954  }
955  auto& chunkMetadata = updel_roll.chunkMetadata[key];
956 
957  auto buffer = chunk->get_buffer();
958  const auto& lhs_type = cd->columnType;
959 
960  auto update_stats = [& encoder = buffer->encoder](auto min, auto max, auto has_null) {
961  static_assert(std::is_same<decltype(min), decltype(max)>::value,
962  "Type mismatch on min/max");
963  if (has_null) {
964  encoder->updateStats(decltype(min)(), true);
965  }
966  if (max < min) {
967  return;
968  }
969  encoder->updateStats(min, false);
970  encoder->updateStats(max, false);
971  };
972 
973  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
974  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
975  } else if (lhs_type.is_fp()) {
976  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
977  } else if (lhs_type.is_decimal()) {
978  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
979  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
980  has_null_per_chunk);
981  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
982  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
983  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
984  }
985  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
986 
987  // removed as @alex suggests. keep it commented in case of any chance to revisit
988  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
989 }
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:480
std::mutex mutex
Definition: UpdelRoll.h:49
bool is_integral(const SQLTypeInfo &t)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 296 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK(), checked_get(), cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, g_enable_experimental_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfoCore< TYPE_FACET_PACK >::get_size(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), Executor::getExecutor(), getFragmentInfoFromId(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), SQLTypeInfoCore< TYPE_FACET_PACK >::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, num_rows, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

305  {
306  if (!is_feature_enabled<VarlenUpdates>()) {
307  throw std::runtime_error("varlen UPDATE path not enabled.");
308  }
309 
310  updelRoll.is_varlen_update = true;
311  updelRoll.catalog = catalog;
312  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
313  updelRoll.memoryLevel = memoryLevel;
314 
315  size_t num_entries = sourceDataProvider.getEntryCount();
316  size_t num_rows = sourceDataProvider.getRowCount();
317 
318  if (0 == num_rows) {
319  // bail out early
320  return;
321  }
322 
324 
325  auto& fragment = getFragmentInfoFromId(fragmentId);
326  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
327  get_chunks(catalog, td, fragment, memoryLevel, chunks);
328  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
329  columnDescriptors.size());
330  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
331  std::shared_ptr<Executor> executor;
332 
334  executor = Executor::getExecutor(catalog->getCurrentDB().dbId);
335  }
336 
337  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
338  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
339  auto chunk = chunks[indexOfChunk];
340  const auto chunk_cd = chunk->get_column_desc();
341 
342  if (chunk_cd->isDeletedCol) {
343  deletedChunk = chunk;
344  continue;
345  }
346 
347  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
348  columnDescriptors.end(),
349  [=](const ColumnDescriptor* cd) -> bool {
350  return cd->columnId == chunk_cd->columnId;
351  });
352 
353  if (targetColumnIt != columnDescriptors.end()) {
354  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
355 
356  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
357  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
358 
360  num_rows,
361  *catalog,
362  sourceDataMetaInfo,
363  targetDescriptor,
364  targetDescriptor->columnType,
365  !targetDescriptor->columnType.get_notnull(),
366  sourceDataProvider.getLiteralDictionary(),
368  ? executor->getStringDictionaryProxy(
369  sourceDataMetaInfo.get_type_info().get_comp_param(),
370  executor->getRowSetMemoryOwner(),
371  true)
372  : nullptr};
373  auto converter = factory.create(param);
374  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
375 
376  if (targetDescriptor->columnType.is_geometry()) {
377  // geometry columns are composites
378  // need to skip chunks, depending on geo type
379  switch (targetDescriptor->columnType.get_type()) {
380  case kMULTIPOLYGON:
381  indexOfChunk += 5;
382  break;
383  case kPOLYGON:
384  indexOfChunk += 4;
385  break;
386  case kLINESTRING:
387  indexOfChunk += 2;
388  break;
389  case kPOINT:
390  indexOfChunk += 1;
391  break;
392  default:
393  CHECK(false); // not supported
394  }
395  }
396  } else {
397  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
398  std::unique_ptr<ChunkToInsertDataConverter> converter;
399 
400  if (chunk_cd->columnType.is_fixlen_array()) {
401  converter =
402  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
403  } else if (chunk_cd->columnType.is_string()) {
404  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
405  } else if (chunk_cd->columnType.is_geometry()) {
406  // the logical geo column is a string column
407  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
408  } else {
409  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
410  }
411 
412  chunkConverters.push_back(std::move(converter));
413 
414  } else if (chunk_cd->columnType.is_date_in_days()) {
415  /* Q: Why do we need this?
416  A: In variable length updates path we move the chunk content of column
417  without decoding. Since it again passes through DateDaysEncoder
418  the expected value should be in seconds, but here it will be in days.
419  Therefore, using DateChunkConverter chunk values are being scaled to
420  seconds which then ultimately encoded in days in DateDaysEncoder.
421  */
422  std::unique_ptr<ChunkToInsertDataConverter> converter;
423  const size_t physical_size = chunk_cd->columnType.get_size();
424  if (physical_size == 2) {
425  converter =
426  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
427  } else if (physical_size == 4) {
428  converter =
429  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
430  } else {
431  CHECK(false);
432  }
433  chunkConverters.push_back(std::move(converter));
434  } else {
435  std::unique_ptr<ChunkToInsertDataConverter> converter;
436  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
437  int logical_size = logical_type.get_size();
438  int physical_size = chunk_cd->columnType.get_size();
439 
440  if (logical_type.is_string()) {
441  // for dicts -> logical = physical
442  logical_size = physical_size;
443  }
444 
445  if (8 == physical_size) {
446  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
447  num_rows, chunk.get());
448  } else if (4 == physical_size) {
449  if (8 == logical_size) {
450  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
451  num_rows, chunk.get());
452  } else {
453  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
454  num_rows, chunk.get());
455  }
456  } else if (2 == chunk_cd->columnType.get_size()) {
457  if (8 == logical_size) {
458  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
459  num_rows, chunk.get());
460  } else if (4 == logical_size) {
461  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
462  num_rows, chunk.get());
463  } else {
464  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
465  num_rows, chunk.get());
466  }
467  } else if (1 == chunk_cd->columnType.get_size()) {
468  if (8 == logical_size) {
469  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
470  num_rows, chunk.get());
471  } else if (4 == logical_size) {
472  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
473  num_rows, chunk.get());
474  } else if (2 == logical_size) {
475  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
476  num_rows, chunk.get());
477  } else {
478  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
479  num_rows, chunk.get());
480  }
481  } else {
482  CHECK(false); // unknown
483  }
484 
485  chunkConverters.push_back(std::move(converter));
486  }
487  }
488  }
489 
490  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
491  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
492 
493  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
494  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
495  deletedChunk->get_column_desc()->tableId,
496  deletedChunk->get_column_desc()->columnId,
497  fragment.fragmentId};
498  updelRoll.dirtyChunkeys.insert(chunkey);
499  bool* deletedChunkBuffer =
500  reinterpret_cast<bool*>(deletedChunk->get_buffer()->getMemoryPtr());
501 
502  std::atomic<size_t> row_idx{0};
503 
504  auto row_converter = [&sourceDataProvider,
505  &sourceDataConverters,
506  &indexOffFragmentOffsetColumn,
507  &chunkConverters,
508  &deletedChunkBuffer,
509  &row_idx](size_t indexOfEntry) -> void {
510  // convert the source data
511  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
512  if (row.empty()) {
513  return;
514  }
515 
516  size_t indexOfRow = row_idx.fetch_add(1);
517 
518  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
519  if (sourceDataConverters[col]) {
520  const auto& mapd_variant = row[col];
521  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
522  }
523  }
524 
525  auto scalar = checked_get(
526  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
527  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
528 
529  // convert the remaining chunks
530  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
531  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
532  }
533 
534  // now mark the row as deleted
535  deletedChunkBuffer[indexInChunkBuffer] = true;
536  };
537 
538  bool can_go_parallel = num_rows > 20000;
539 
540  if (can_go_parallel) {
541  const size_t num_worker_threads = cpu_threads();
542  std::vector<std::future<void>> worker_threads;
543  for (size_t i = 0,
544  start_entry = 0,
545  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
546  i < num_worker_threads && start_entry < num_entries;
547  ++i, start_entry += stride) {
548  const auto end_entry = std::min(start_entry + stride, num_rows);
549  worker_threads.push_back(std::async(
550  std::launch::async,
551  [&row_converter](const size_t start, const size_t end) {
552  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
553  row_converter(indexOfRow);
554  }
555  },
556  start_entry,
557  end_entry));
558  }
559 
560  for (auto& child : worker_threads) {
561  child.wait();
562  }
563 
564  } else {
565  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
566  row_converter(entryIdx);
567  }
568  }
569 
571  insert_data.databaseId = catalog->getCurrentDB().dbId;
572  insert_data.tableId = td->tableId;
573 
574  for (size_t i = 0; i < chunkConverters.size(); i++) {
575  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
576  continue;
577  }
578 
579  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
580  if (sourceDataConverters[i]) {
581  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
582  }
583  continue;
584  }
585 
586  insert_data.numRows = num_rows;
587  insertDataNoCheckpoint(insert_data);
588 
589  // update metdata
590  if (!deletedChunk->get_buffer()->has_encoder) {
591  deletedChunk->init_encoder();
592  }
593  deletedChunk->get_buffer()->encoder->updateStats(static_cast<int64_t>(true), false);
594 
595  auto& shadowDeletedChunkMeta =
596  fragment.shadowChunkMetadataMap[deletedChunk->get_column_desc()->columnId];
597  if (shadowDeletedChunkMeta.numElements >
598  deletedChunk->get_buffer()->encoder->getNumElems()) {
599  // the append will have populated shadow meta data, otherwise use existing num
600  // elements
601  deletedChunk->get_buffer()->encoder->setNumElems(shadowDeletedChunkMeta.numElements);
602  }
603  deletedChunk->get_buffer()->setUpdated();
604 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
bool is_varlen_update
Definition: UpdelRoll.h:67
std::vector< int > ChunkKey
Definition: types.h:35
const int8_t const int64_t * num_rows
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters(),::QueryRenderer::QueryRenderManager *render_manager=nullptr)
Definition: Execute.cpp:127
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:2916
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
bool g_enable_experimental_string_functions
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:64
bool is_string() const
Definition: sqltypes.h:477
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:25
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 991 of file UpdelStorage.cpp.

References UpdelRoll::chunkMetadata, fragmentInfoMutex_, and UpdelRoll::numTuples.

993  {
994  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
995  if (updel_roll.chunkMetadata.count(key)) {
996  auto& fragmentInfo = *key.second;
997  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
998  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
999  fragmentInfo.setChunkMetadataMap(chunkMetadata);
1000  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
1001  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
1002  // TODO(ppan): When fragment-level compaction is enable, the following code
1003  // should suffice. When not (ie. existing code), we'll revert to update
1004  // InsertOrderFragmenter::varLenColInfo_
1005  /*
1006  for (const auto cit : chunkMetadata) {
1007  const auto& cd = *catalog->getMetadataForColumn(td->tableId, cit.first);
1008  if (cd.columnType.get_size() < 0)
1009  fragmentInfo.varLenColInfox[cd.columnId] = cit.second.numBytes;
1010  }
1011  */
1012  }
1013 }
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1113 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1116  {
1117  const auto cd = chunk->get_column_desc();
1118  const auto& col_type = cd->columnType;
1119  auto data_buffer = chunk->get_buffer();
1120  auto data_addr = data_buffer->getMemoryPtr();
1121  auto element_size =
1122  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1123  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1124  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1125  size_t nbytes_fix_data_to_keep = 0;
1126  auto nrows_to_vacuum = frag_offsets.size();
1127  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1128  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1129  auto is_last_one = irow == nrows_to_vacuum;
1130  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1131  auto maddr_to_vacuum = data_addr;
1132  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1133  if (nrows_to_keep > 0) {
1134  auto nbytes_to_keep = nrows_to_keep * element_size;
1135  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1136  // move curr fixlen row block toward front
1137  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1138  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1139  nbytes_to_keep);
1140  }
1141  irow_of_blk_to_fill += nrows_to_keep;
1142  nbytes_fix_data_to_keep += nbytes_to_keep;
1143  }
1144  irow_of_blk_to_keep = irow_to_vacuum + 1;
1145  }
1146  return nbytes_fix_data_to_keep;
1147 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1149 of file UpdelStorage.cpp.

References Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1152  {
1153  auto data_buffer = chunk->get_buffer();
1154  auto index_buffer = chunk->get_index_buf();
1155  auto data_addr = data_buffer->getMemoryPtr();
1156  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1157  auto index_array = (StringOffsetT*)indices_addr;
1158  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1159  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1160  size_t nbytes_fix_data_to_keep = 0;
1161  size_t nbytes_var_data_to_keep = 0;
1162  auto nrows_to_vacuum = frag_offsets.size();
1163  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1164  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1165  auto is_last_one = irow == nrows_to_vacuum;
1166  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1167  auto maddr_to_vacuum = data_addr;
1168  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1169  if (nrows_to_keep > 0) {
1170  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1171  auto nbytes_to_keep =
1172  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1173  index_array[irow_of_blk_to_keep];
1174  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1175  // move curr varlen row block toward front
1176  memmove(data_addr + ibyte_var_data_to_keep,
1177  data_addr + index_array[irow_of_blk_to_keep],
1178  nbytes_to_keep);
1179 
1180  const auto index_base = index_array[irow_of_blk_to_keep];
1181  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1182  auto& index = index_array[irow_of_blk_to_keep + i];
1183  index = ibyte_var_data_to_keep + (index - index_base);
1184  }
1185  }
1186  nbytes_var_data_to_keep += nbytes_to_keep;
1187  maddr_to_vacuum = indices_addr;
1188 
1189  constexpr static auto index_element_size = sizeof(StringOffsetT);
1190  nbytes_to_keep = nrows_to_keep * index_element_size;
1191  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1192  // move curr fixlen row block toward front
1193  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1194  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1195  nbytes_to_keep);
1196  }
1197  irow_of_blk_to_fill += nrows_to_keep;
1198  nbytes_fix_data_to_keep += nbytes_to_keep;
1199  }
1200  irow_of_blk_to_keep = irow_to_vacuum + 1;
1201  }
1202  return nbytes_var_data_to_keep;
1203 }
int32_t StringOffsetT
Definition: sqltypes.h:912

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 180 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 182 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 186 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

Referenced by updateMetadata().

std::deque<FragmentInfo> Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 184 of file InsertOrderFragmenter.h.

Referenced by getFragmentInfoFromId().

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 195 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 194 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 190 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 196 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 207 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 193 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 191 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 205 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 189 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 230 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 206 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: