OmniSciDB  7bf56492aa
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insertDataStruct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertDataNoCheckpoint (InsertData &insertDataStruct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Static Public Member Functions

static void updateColumn (const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insertDataStruct)
 
void replicateData (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< std::unique_ptr
< FragmentInfo > > 
fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 50 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 90 of file InsertOrderFragmenter.cpp.

90 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1202 of file UpdelStorage.cpp.

References CHECK(), cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), getChunksForAllColumns(), getFragmentInfo(), UpdelRoll::numTuples, Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1207  {
1208  auto fragment_ptr = getFragmentInfo(fragment_id);
1209  auto& fragment = *fragment_ptr;
1210  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1211  const auto ncol = chunks.size();
1212 
1213  std::vector<int8_t> has_null_per_thread(ncol, 0);
1214  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1215  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1216  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1217  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1218 
1219  // parallel delete columns
1220  std::vector<std::future<void>> threads;
1221  auto nrows_to_vacuum = frag_offsets.size();
1222  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1223  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1224 
1225  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1226  auto chunk = chunks[ci];
1227  const auto cd = chunk->get_column_desc();
1228  const auto& col_type = cd->columnType;
1229  auto data_buffer = chunk->get_buffer();
1230  auto index_buffer = chunk->get_index_buf();
1231  auto data_addr = data_buffer->getMemoryPtr();
1232  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1233  auto index_array = (StringOffsetT*)indices_addr;
1234  bool is_varlen = col_type.is_varlen_indeed();
1235 
1236  auto fixlen_vacuum = [=,
1237  &has_null_per_thread,
1238  &max_double_per_thread,
1239  &min_double_per_thread,
1240  &min_int64t_per_thread,
1241  &max_int64t_per_thread,
1242  &updel_roll,
1243  &frag_offsets,
1244  &fragment] {
1245  size_t nbytes_fix_data_to_keep;
1246  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1247 
1248  data_buffer->encoder->setNumElems(nrows_to_keep);
1249  data_buffer->setSize(nbytes_fix_data_to_keep);
1250  data_buffer->setUpdated();
1251 
1252  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1253 
1254  auto daddr = data_addr;
1255  auto element_size =
1256  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1257  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1258  if (col_type.is_fixlen_array()) {
1259  auto encoder =
1260  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1261  CHECK(encoder);
1262  encoder->updateMetadata((int8_t*)daddr);
1263  } else if (col_type.is_fp()) {
1264  set_chunk_stats(col_type,
1265  data_addr,
1266  has_null_per_thread[ci],
1267  min_double_per_thread[ci],
1268  max_double_per_thread[ci]);
1269  } else {
1270  set_chunk_stats(col_type,
1271  data_addr,
1272  has_null_per_thread[ci],
1273  min_int64t_per_thread[ci],
1274  max_int64t_per_thread[ci]);
1275  }
1276  }
1277  };
1278 
1279  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1280  size_t nbytes_var_data_to_keep;
1281  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1282 
1283  data_buffer->encoder->setNumElems(nrows_to_keep);
1284  data_buffer->setSize(nbytes_var_data_to_keep);
1285  data_buffer->setUpdated();
1286 
1287  index_array[nrows_to_keep] = data_buffer->size();
1288  index_buffer->setSize(sizeof(*index_array) *
1289  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1290  index_buffer->setUpdated();
1291 
1292  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1293  };
1294 
1295  if (is_varlen) {
1296  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1297  } else {
1298  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1299  }
1300  if (threads.size() >= (size_t)cpu_threads()) {
1301  wait_cleanup_threads(threads);
1302  }
1303  }
1304 
1305  wait_cleanup_threads(threads);
1306 
1307  auto key = std::make_pair(td, &fragment);
1308  updel_roll.numTuples[key] = nrows_to_keep;
1309  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1310  auto chunk = chunks[ci];
1311  auto cd = chunk->get_column_desc();
1312  if (!cd->columnType.is_fixlen_array()) {
1314  fragment,
1315  chunk,
1316  has_null_per_thread[ci],
1317  max_double_per_thread[ci],
1318  min_double_per_thread[ci],
1319  max_int64t_per_thread[ci],
1320  min_int64t_per_thread[ci],
1321  cd->columnType,
1322  updel_roll);
1323  }
1324  }
1325 }
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:839
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
CHECK(cgen_state)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
int cpu_threads()
Definition: thread_count.h:25
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 661 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::FragmentInfo::fragmentId.

662  {
663  // also sets the new fragment as the insertBuffer for each column
664 
665  maxFragmentId_++;
666  auto newFragmentInfo = std::make_unique<FragmentInfo>();
667  newFragmentInfo->fragmentId = maxFragmentId_;
668  newFragmentInfo->shadowNumTuples = 0;
669  newFragmentInfo->setPhysicalNumTuples(0);
670  for (const auto levelSize : dataMgr_->levelSizes_) {
671  newFragmentInfo->deviceIds.push_back(newFragmentInfo->fragmentId % levelSize);
672  }
673  newFragmentInfo->physicalTableId = physicalTableId_;
674  newFragmentInfo->shard = shard_;
675 
676  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
677  colMapIt != columnMap_.end();
678  ++colMapIt) {
679  // colMapIt->second.unpin_buffer();
680  ChunkKey chunkKey = chunkKeyPrefix_;
681  chunkKey.push_back(colMapIt->second.get_column_desc()->columnId);
682  chunkKey.push_back(maxFragmentId_);
683  colMapIt->second.createChunkBuffer(
684  dataMgr_,
685  chunkKey,
686  memoryLevel,
687  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
688  pageSize_);
689  colMapIt->second.init_encoder();
690  }
691 
692  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
693  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
694  return fragmentInfoVec_.back().get();
695 }
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< int > levelSizes_
Definition: DataMgr.h:153
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 209 of file InsertOrderFragmenter.cpp.

References catalog_(), and lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

209  {
210  // Fix a verified loophole on sharded logical table which is locked using logical
211  // tableId while it's its physical tables that can come here when fragments overflow
212  // during COPY. Locks on a logical table and its physical tables never intersect, which
213  // means potential races. It'll be an overkill to resolve a logical table to physical
214  // tables in MapDHandler, ParseNode or other higher layers where the logical table is
215  // locked with Table Read/Write locks; it's easier to lock the logical table of its
216  // physical tables. A downside of this approach may be loss of parallel execution of
217  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
218  // operation, the loss seems not a big deal.
219  auto chunkKeyPrefix = chunkKeyPrefix_;
220  if (shard_ >= 0) {
221  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
222  }
223 
224  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
225  // SELECT and COPY may enter a deadlock
226  const auto delete_lock =
228 
229  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
230 
231  for (const auto fragId : dropFragIds) {
232  for (const auto& col : columnMap_) {
233  int colId = col.first;
234  vector<int> fragPrefix = chunkKeyPrefix_;
235  fragPrefix.push_back(colId);
236  fragPrefix.push_back(fragId);
237  dataMgr_->deleteChunksWithPrefix(fragPrefix);
238  }
239  }
240 }
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:88
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3343
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:415
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 455 of file InsertOrderFragmenter.cpp.

455  {
456  // prevent concurrent insert rows and drop column
457  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
458  // synchronize concurrent accesses to fragmentInfoVec_
459  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
460  for (auto const& fragmentInfo : fragmentInfoVec_) {
461  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
462  }
463 
464  for (const auto columnId : columnIds) {
465  auto cit = columnMap_.find(columnId);
466  if (columnMap_.end() != cit) {
467  columnMap_.erase(cit);
468  }
469 
470  vector<int> fragPrefix = chunkKeyPrefix_;
471  fragPrefix.push_back(columnId);
472  dataMgr_->deleteChunksWithPrefix(fragPrefix);
473 
474  for (const auto& fragmentInfo : fragmentInfoVec_) {
475  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
476  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
477  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
478  }
479  }
480  }
481  for (const auto& fragmentInfo : fragmentInfoVec_) {
482  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
483  }
484 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:415
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 182 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

182  {
183  // not safe to call from outside insertData
184  // b/c depends on insertLock around numTuples_
185 
186  // don't ever drop the only fragment!
187  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
188  return;
189  }
190 
191  if (numTuples_ > maxRows) {
192  size_t preNumTuples = numTuples_;
193  vector<int> dropFragIds;
194  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
195  while (numTuples_ > targetRows) {
196  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
197  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
198  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
199  fragmentInfoVec_.pop_front();
200  CHECK_GE(numTuples_, numFragTuples);
201  numTuples_ -= numFragTuples;
202  }
203  deleteFragments(dropFragIds);
204  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
205  << " post: " << numTuples_ << " maxRows: " << maxRows;
206  }
207 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:188
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define CHECK_GT(x, y)
Definition: Logger.h:209
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 101 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 92 of file InsertOrderFragmenter.cpp.

References CHECK(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, and to_string().

92  {
95  // memory-resident tables won't have anything on disk
96  std::vector<std::pair<ChunkKey, ChunkMetadata>> chunk_metadata;
98 
99  // data comes like this - database_id, table_id, column_id, fragment_id
100  // but lets sort by database_id, table_id, fragment_id, column_id
101 
102  int fragment_subkey_index = 3;
103  std::sort(chunk_metadata.begin(),
104  chunk_metadata.end(),
105  [&](const std::pair<ChunkKey, ChunkMetadata>& pair1,
106  const std::pair<ChunkKey, ChunkMetadata>& pair2) {
107  return pair1.first[3] < pair2.first[3];
108  });
109 
110  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
111  ++chunk_itr) {
112  int cur_column_id = chunk_itr->first[2];
113  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
114 
115  if (fragmentInfoVec_.empty() ||
116  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
117  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
118  CHECK(new_fragment_info);
119  maxFragmentId_ = cur_fragment_id;
120  new_fragment_info->fragmentId = cur_fragment_id;
121  new_fragment_info->setPhysicalNumTuples(chunk_itr->second.numElements);
122  numTuples_ += new_fragment_info->getPhysicalNumTuples();
123  for (const auto level_size : dataMgr_->levelSizes_) {
124  new_fragment_info->deviceIds.push_back(cur_fragment_id % level_size);
125  }
126  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
127  new_fragment_info->physicalTableId = physicalTableId_;
128  new_fragment_info->shard = shard_;
129  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
130  } else {
131  if (chunk_itr->second.numElements !=
132  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
133  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
134  std::to_string(physicalTableId_) + ", Column " +
135  std::to_string(cur_column_id) + ". Fragment Tuples: " +
137  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
138  ", Chunk Tuples: " +
139  std::to_string(chunk_itr->second.numElements);
140  }
141  }
142  CHECK(fragmentInfoVec_.back().get());
143  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
144  }
145  }
146 
147  ssize_t maxFixedColSize = 0;
148 
149  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
150  ssize_t size = colIt->second.get_column_desc()->columnType.get_size();
151  if (size == -1) { // variable length
152  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
153  size = 8; // b/c we use this for string and array indices - gross to have magic
154  // number here
155  }
156  maxFixedColSize = std::max(maxFixedColSize, size);
157  }
158 
159  // this is maximum number of rows assuming everything is fixed length
160  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
161 
162  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
163  // Now need to get the insert buffers for each column - should be last
164  // fragment
165  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
166  // TODO: add accessor here for safe indexing
167  int deviceId =
168  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
169  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
170  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
171  insertKey.push_back(colIt->first); // column id
172  insertKey.push_back(lastFragmentId); // fragment id
173  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
174  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
175  if (varLenColInfoIt != varLenColInfo_.end()) {
176  varLenColInfoIt->second = colIt->second.get_buffer()->size();
177  }
178  }
179  }
180 }
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< int > levelSizes_
Definition: DataMgr.h:153
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:188
std::string to_string(char const *&&v)
CHECK(cgen_state)
void getChunkMetadataVecForKeyPrefix(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:391
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 1012 of file UpdelStorage.cpp.

References catalog_, CHECK(), Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

1015  {
1016  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
1017  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1018  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1019  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1020  ++ncol;
1021  if (!cd->isVirtualCol) {
1022  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1023  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1024  ChunkKey chunk_key{
1025  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1026  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1027  &catalog_->getDataMgr(),
1028  chunk_key,
1029  memory_level,
1030  0,
1031  chunk_meta_it->second.numBytes,
1032  chunk_meta_it->second.numElements);
1033  chunks.push_back(chunk);
1034  }
1035  }
1036  }
1037  return chunks;
1038 }
std::vector< int > ChunkKey
Definition: types.h:35
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:183
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:182
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 100 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

100 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 105 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 332 of file InsertOrderFragmenter.cpp.

References CHECK().

Referenced by compactRows(), updateColumn(), and updateColumns().

332  {
333  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
334  fragmentInfoVec_.end(),
335  [fragment_id](const auto& fragment) -> bool {
336  return fragment->fragmentId == fragment_id;
337  });
338  CHECK(fragment_it != fragmentInfoVec_.end());
339  return fragment_it->get();
340 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected
TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 697 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

697  {
698  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
699  TableInfo queryInfo;
700  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
701  // right now we don't test predicate, so just return (copy of) all fragments
702  bool fragmentsExist = false;
703  if (fragmentInfoVec_.empty()) {
704  // If we have no fragments add a dummy empty fragment to make the executor
705  // not have separate logic for 0-row tables
706  int maxFragmentId = 0;
707  FragmentInfo emptyFragmentInfo;
708  emptyFragmentInfo.fragmentId = maxFragmentId;
709  emptyFragmentInfo.shadowNumTuples = 0;
710  emptyFragmentInfo.setPhysicalNumTuples(0);
711  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
712  emptyFragmentInfo.physicalTableId = physicalTableId_;
713  emptyFragmentInfo.shard = shard_;
714  queryInfo.fragments.push_back(emptyFragmentInfo);
715  } else {
716  fragmentsExist = true;
717  std::for_each(
718  fragmentInfoVec_.begin(),
719  fragmentInfoVec_.end(),
720  [&queryInfo](const auto& fragment_owned_ptr) {
721  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
722  });
723  }
724  readLock.unlock();
725  queryInfo.setPhysicalNumTuples(0);
726  auto partIt = queryInfo.fragments.begin();
727  if (fragmentsExist) {
728  while (partIt != queryInfo.fragments.end()) {
729  if (partIt->getPhysicalNumTuples() == 0) {
730  // this means that a concurrent insert query inserted tuples into a new fragment
731  // but when the query came in we didn't have this fragment. To make sure we don't
732  // mess up the executor we delete this fragment from the metadatamap (fixes
733  // earlier bug found 2015-05-08)
734  partIt = queryInfo.fragments.erase(partIt);
735  } else {
736  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
737  partIt->getPhysicalNumTuples());
738  ++partIt;
739  }
740  }
741  } else {
742  // We added a dummy fragment and know the table is empty
743  queryInfo.setPhysicalNumTuples(0);
744  }
745  return queryInfo;
746 }
std::vector< int > levelSizes_
Definition: DataMgr.h:153
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1041 of file UpdelStorage.cpp.

References CHECK(), cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1042  {
1043  const auto data_buffer = chunk->get_buffer();
1044  const auto data_addr = data_buffer->getMemoryPtr();
1045  const size_t nrows_in_chunk = data_buffer->size();
1046  const size_t ncore = cpu_threads();
1047  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1048  std::vector<std::vector<uint64_t>> deleted_offsets;
1049  deleted_offsets.resize(ncore);
1050  std::vector<std::future<void>> threads;
1051  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1052  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1053  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1054  const auto ithread = rbegin / segsz;
1055  CHECK(ithread < deleted_offsets.size());
1056  deleted_offsets[ithread].reserve(segsz);
1057  for (size_t r = rbegin; r < rend; ++r) {
1058  if (data_addr[r]) {
1059  deleted_offsets[ithread].push_back(r);
1060  }
1061  }
1062  }));
1063  }
1064  wait_cleanup_threads(threads);
1065  std::vector<uint64_t> all_deleted_offsets;
1066  for (size_t i = 0; i < ncore; ++i) {
1067  all_deleted_offsets.insert(
1068  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1069  }
1070  return all_deleted_offsets;
1071 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
CHECK(cgen_state)
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 486 of file InsertOrderFragmenter.cpp.

References CHECK().

486  {
487  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
488 
489  for (auto const& fragment : fragmentInfoVec_) {
490  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
491  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
492  const auto& chunk_stats = chunk_meta_it->second.chunkStats;
493  if (chunk_stats.max.tinyintval == 1) {
494  return true;
495  }
496  }
497  return false;
498 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
CHECK(cgen_state)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insertDataStruct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 342 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

342  {
343  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
344  try {
345  mapd_unique_lock<mapd_shared_mutex> insertLock(
346  insertMutex_); // prevent two threads from trying to insert into the same table
347  // simultaneously
348 
349  insertDataImpl(insertDataStruct);
350 
351  if (defaultInsertLevel_ ==
352  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
354  chunkKeyPrefix_[0],
355  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
356  }
357  } catch (...) {
358  int32_t tableEpoch =
359  catalog_->getTableEpoch(insertDataStruct.databaseId, insertDataStruct.tableId);
360 
361  // the statement below deletes *this* object!
362  // relying on exception propagation at this stage
363  // until we can sort this out in a cleaner fashion
365  insertDataStruct.databaseId, insertDataStruct.tableId, tableEpoch);
366  throw;
367  }
368 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:467
void insertDataImpl(InsertData &insertDataStruct)
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2485
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2515

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insertDataStruct)
protected

Definition at line 500 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK(), CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::replicate_count.

500  {
501  // populate deleted system column if it should exists, as it will not come from client
502  // Do not add this magical column in the replicate ALTER TABLE ADD route as
503  // it is not needed and will cause issues
504  std::unique_ptr<int8_t[]> data_for_deleted_column;
505  for (const auto& cit : columnMap_) {
506  if (cit.second.get_column_desc()->isDeletedCol &&
507  insertDataStruct.replicate_count == 0) {
508  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
509  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
510  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
511  insertDataStruct.columnIds.push_back(cit.second.get_column_desc()->columnId);
512  break;
513  }
514  }
515  // MAT we need to add a removal of the empty column we pushed onto here
516  // for upstream safety. Should not be a problem but need to fix.
517 
518  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
519  for (const auto columnId : insertDataStruct.columnIds) {
520  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
521  CHECK(columnDesc);
522  if (columnMap_.end() == columnMap_.find(columnId)) {
523  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
524  }
525  }
526 
527  // when replicate (add) column(s), this path seems wont work; go separate route...
528  if (insertDataStruct.replicate_count > 0) {
529  replicateData(insertDataStruct);
530  return;
531  }
532 
533  std::unordered_map<int, int> inverseInsertDataColIdMap;
534  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
535  inverseInsertDataColIdMap.insert(
536  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
537  }
538 
539  size_t numRowsLeft = insertDataStruct.numRows;
540  size_t numRowsInserted = 0;
541  vector<DataBlockPtr> dataCopy =
542  insertDataStruct.data; // bc append data will move ptr forward and this violates
543  // constness of InsertData
544  if (numRowsLeft <= 0) {
545  return;
546  }
547 
548  FragmentInfo* currentFragment{nullptr};
549 
550  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
551  currentFragment = createNewFragment(defaultInsertLevel_);
552  } else {
553  currentFragment = fragmentInfoVec_.back().get();
554  }
555  CHECK(currentFragment);
556 
557  size_t startFragment = fragmentInfoVec_.size() - 1;
558 
559  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
560  // loop until done inserting all rows
561  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
562  size_t rowsLeftInCurrentFragment =
563  maxFragmentRows_ - currentFragment->shadowNumTuples;
564  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
565  if (rowsLeftInCurrentFragment != 0) {
566  for (auto& varLenColInfoIt : varLenColInfo_) {
567  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
568  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
569  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
570  if (insertIdIt != inverseInsertDataColIdMap.end()) {
571  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
572  numRowsToInsert = std::min(
573  numRowsToInsert,
574  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
575  numRowsToInsert,
576  numRowsInserted,
577  bytesLeft));
578  }
579  }
580  }
581 
582  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
583  currentFragment = createNewFragment(defaultInsertLevel_);
584  if (numRowsInserted == 0) {
585  startFragment++;
586  }
587  rowsLeftInCurrentFragment = maxFragmentRows_;
588  for (auto& varLenColInfoIt : varLenColInfo_) {
589  varLenColInfoIt.second = 0; // reset byte counter
590  }
591  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
592  for (auto& varLenColInfoIt : varLenColInfo_) {
593  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
594  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
595  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
596  if (insertIdIt != inverseInsertDataColIdMap.end()) {
597  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
598  numRowsToInsert = std::min(
599  numRowsToInsert,
600  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
601  numRowsToInsert,
602  numRowsInserted,
603  bytesLeft));
604  }
605  }
606  }
607 
608  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
609  // never be able to insert anything
610 
611  // for each column, append the data in the appropriate insert buffer
612  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
613  int columnId = insertDataStruct.columnIds[i];
614  auto colMapIt = columnMap_.find(columnId);
615  CHECK(colMapIt != columnMap_.end());
616  currentFragment->shadowChunkMetadataMap[columnId] =
617  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
618  auto varLenColInfoIt = varLenColInfo_.find(columnId);
619  if (varLenColInfoIt != varLenColInfo_.end()) {
620  varLenColInfoIt->second = colMapIt->second.get_buffer()->size();
621  }
622  }
623  if (hasMaterializedRowId_) {
624  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
625  currentFragment->shadowNumTuples;
626  int64_t* rowIdData = new int64_t[numRowsToInsert];
627  for (size_t i = 0; i < numRowsToInsert; ++i) {
628  rowIdData[i] = i + startId;
629  }
630  DataBlockPtr rowIdBlock;
631  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(rowIdData);
632  auto colMapIt = columnMap_.find(rowIdColId_);
633  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
634  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
635  delete[] rowIdData;
636  }
637 
638  currentFragment->shadowNumTuples =
639  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
640  numRowsLeft -= numRowsToInsert;
641  numRowsInserted += numRowsToInsert;
642  }
643  {
644  // Only take the fragment info lock when updating fragment info map. Otherwise,
645  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
646  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
647  // COPY_FROM waits for TableWriteLock
648  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
649  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
650  partIt != fragmentInfoVec_.end();
651  ++partIt) {
652  auto fragment_ptr = partIt->get();
653  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
654  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
655  }
656  }
657  numTuples_ += insertDataStruct.numRows;
659 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void dropFragmentsToSize(const size_t maxRows) override
Will truncate table to less than maxRows by dropping fragments.
#define CHECK_GT(x, y)
Definition: Logger.h:209
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::unordered_map< int, size_t > varLenColInfo_
void replicateData(const InsertData &insertDataStruct)
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:138

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insertDataStruct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 370 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

370  {
371  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
372  mapd_unique_lock<mapd_shared_mutex> insertLock(
373  insertMutex_); // prevent two threads from trying to insert into the same table
374  // simultaneously
375  insertDataImpl(insertDataStruct);
376 }
void insertDataImpl(InsertData &insertDataStruct)

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::replicateData ( const InsertData insertDataStruct)
protected

Definition at line 378 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::bypass, catalog_(), CHECK(), Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

378  {
379  // synchronize concurrent accesses to fragmentInfoVec_
380  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
381  size_t numRowsLeft = insertDataStruct.numRows;
382  for (auto const& fragmentInfo : fragmentInfoVec_) {
383  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
384  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
385  size_t numRowsCanBeInserted;
386  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
387  if (insertDataStruct.bypass[i]) {
388  continue;
389  }
390  auto columnId = insertDataStruct.columnIds[i];
391  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
392  CHECK(colDesc);
393  CHECK(columnMap_.find(columnId) != columnMap_.end());
394 
395  ChunkKey chunkKey = chunkKeyPrefix_;
396  chunkKey.push_back(columnId);
397  chunkKey.push_back(fragmentInfo->fragmentId);
398 
399  auto colMapIt = columnMap_.find(columnId);
400  auto& chunk = colMapIt->second;
401  if (chunk.isChunkOnDevice(
402  dataMgr_,
403  chunkKey,
405  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
406  dataMgr_->deleteChunksWithPrefix(chunkKey);
407  }
408  chunk.createChunkBuffer(
409  dataMgr_,
410  chunkKey,
412  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
413  chunk.init_encoder();
414 
415  try {
416  DataBlockPtr dataCopy = insertDataStruct.data[i];
417  auto size = colDesc->columnType.get_size();
418  if (0 > size) {
419  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
420  varLenColInfo_[columnId] = 0;
421  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
422  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
423  } else {
424  numRowsCanBeInserted = maxChunkSize_ / size;
425  }
426 
427  // FIXME: abort a case in which new column is wider than existing columns
428  if (numRowsCanBeInserted < numRowsToInsert) {
429  throw std::runtime_error("new column '" + colDesc->columnName +
430  "' wider than existing columns is not supported");
431  }
432 
433  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
434  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
435 
436  // update total size of var-len column in (actually the last) fragment
437  if (0 > size) {
438  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
439  varLenColInfo_[columnId] = chunk.get_buffer()->size();
440  }
441  } catch (...) {
442  dataMgr_->deleteChunksWithPrefix(chunkKey);
443  throw;
444  }
445  }
446  numRowsLeft -= numRowsToInsert;
447  }
448  CHECK(0 == numRowsLeft);
449 
450  for (auto const& fragmentInfo : fragmentInfoVec_) {
451  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
452  }
453 }
std::vector< int > ChunkKey
Definition: types.h:35
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:415
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 242 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK(), ChunkMetadata::chunkStats, ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, showChunk(), VLOG, and logger::WARNING.

244  {
245  // synchronize concurrent accesses to fragmentInfoVec_
246  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
252  if (shard_ >= 0) {
253  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
254  }
255 
256  CHECK(cd);
257  const auto column_id = cd->columnId;
258  const auto col_itr = columnMap_.find(column_id);
259  CHECK(col_itr != columnMap_.end());
260 
261  for (auto const& fragment : fragmentInfoVec_) {
262  auto stats_itr = stats_map.find(fragment->fragmentId);
263  if (stats_itr != stats_map.end()) {
264  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
265  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
266  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
268  column_id,
269  fragment->fragmentId};
270  auto chunk = Chunk_NS::Chunk::getChunk(cd,
271  &catalog_->getDataMgr(),
272  chunk_key,
274  0,
275  chunk_meta_it->second.numBytes,
276  chunk_meta_it->second.numElements);
277  auto buf = chunk->get_buffer();
278  CHECK(buf);
279  auto encoder = buf->encoder.get();
280  if (!encoder) {
281  throw std::runtime_error("No encoder for chunk " + showChunk(chunk_key));
282  }
283 
284  auto chunk_stats = stats_itr->second;
285 
286  ChunkMetadata old_chunk_metadata;
287  encoder->getMetadata(old_chunk_metadata);
288  auto& old_chunk_stats = old_chunk_metadata.chunkStats;
289 
290  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
291  // Use the logical type to display data, since the encoding should be ignored
292  const auto logical_ti = cd->columnType.is_dict_encoded_string()
294  : get_logical_type_info(cd->columnType);
295  if (!didResetStats) {
296  VLOG(3) << "Skipping chunk stats reset for " << showChunk(chunk_key);
297  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
298  << DatumToString(chunk_stats.max, logical_ti);
299  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
300  << DatumToString(chunk_stats.min, logical_ti);
301  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
302  continue; // move to next fragment
303  }
304 
305  VLOG(2) << "Resetting chunk stats for " << showChunk(chunk_key);
306  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
307  << DatumToString(chunk_stats.max, logical_ti);
308  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
309  << DatumToString(chunk_stats.min, logical_ti);
310  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
311 
312  // Reset fragment metadata map and set buffer to dirty
313  ChunkMetadata new_metadata;
314  // Run through fillChunkStats to ensure any transformations to the raw metadata
315  // values get applied (e.g. for date in days)
316  encoder->getMetadata(new_metadata);
317 
318  fragment->setChunkMetadata(column_id, new_metadata);
319  fragment->shadowChunkMetadataMap =
320  fragment->getChunkMetadataMap(); // TODO(adb): needed?
321  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
322  buf->setDirty();
323  }
324  } else {
325  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
326  << ", table " << physicalTableId_ << ", "
327  << ", column " << column_id;
328  }
329  }
330 }
std::vector< int > ChunkKey
Definition: types.h:35
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:224
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:183
#define LOG(tag)
Definition: Logger.h:188
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:796
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:182
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
bool is_dict_encoded_string() const
Definition: sqltypes.h:425
SQLTypeInfo columnType
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const std::string &  tab_name,
const std::string &  col_name,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
static

Definition at line 54 of file UpdelStorage.cpp.

References CHECK(), Catalog_Namespace::Catalog::getMetadataForColumn(), and Catalog_Namespace::Catalog::getMetadataForTable().

Referenced by updateColumn().

62  {
63  const auto td = catalog->getMetadataForTable(tab_name);
64  CHECK(td);
65  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
66  CHECK(cd);
67  td->fragmenter->updateColumn(catalog,
68  td,
69  cd,
70  fragment_id,
71  frag_offsets,
72  rhs_values,
73  rhs_type,
74  memory_level,
75  updel_roll);
76 }
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 602 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, UpdelRoll::mutex, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, ColumnDescriptor::tableId, TableDescriptor::tableId, anonymous_namespace{TypedDataAccessors.h}::tabulate_metadata(), temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, updateColumnMetadata(), NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

610  {
611  updel_roll.catalog = catalog;
612  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
613  updel_roll.memoryLevel = memory_level;
614 
615  const size_t ncore = cpu_threads();
616  const auto nrow = frag_offsets.size();
617  const auto n_rhs_values = rhs_values.size();
618  if (0 == nrow) {
619  return;
620  }
621  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
622 
623  auto fragment_ptr = getFragmentInfo(fragment_id);
624  auto& fragment = *fragment_ptr;
625  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
626  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
627  ChunkKey chunk_key{
628  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
629  auto chunk = Chunk_NS::Chunk::getChunk(cd,
630  &catalog->getDataMgr(),
631  chunk_key,
633  0,
634  chunk_meta_it->second.numBytes,
635  chunk_meta_it->second.numElements);
636 
637  std::vector<int8_t> has_null_per_thread(ncore, 0);
638  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
639  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
640  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
641  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
642 
643  // parallel update elements
644  std::vector<std::future<void>> threads;
645 
646  const auto segsz = (nrow + ncore - 1) / ncore;
647  auto dbuf = chunk->get_buffer();
648  auto dbuf_addr = dbuf->getMemoryPtr();
649  dbuf->setUpdated();
650  {
651  std::lock_guard<std::mutex> lck(updel_roll.mutex);
652  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
653  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
654  }
655 
656  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
657  cd->tableId,
658  cd->columnId,
659  fragment.fragmentId};
660  updel_roll.dirtyChunkeys.insert(chunkey);
661  }
662  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
663  threads.emplace_back(std::async(
664  std::launch::async,
665  [=,
666  &has_null_per_thread,
667  &min_int64t_per_thread,
668  &max_int64t_per_thread,
669  &min_double_per_thread,
670  &max_double_per_thread,
671  &frag_offsets,
672  &rhs_values] {
673  SQLTypeInfo lhs_type = cd->columnType;
674 
675  // !! not sure if this is a undocumented convention or a bug, but for a sharded
676  // table the dictionary id of a encoded string column is not specified by
677  // comp_param in physical table but somehow in logical table :) comp_param in
678  // physical table is always 0, so need to adapt accordingly...
679  auto cdl = (shard_ < 0)
680  ? cd
681  : catalog->getMetadataForColumn(
682  catalog->getLogicalTableId(td->tableId), cd->columnId);
683  CHECK(cdl);
684  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
685  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
686  lhs_type, &decimalOverflowValidator);
687  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
688  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
689  lhs_type, &dateDaysOverflowValidator);
690 
691  StringDictionary* stringDict{nullptr};
692  if (lhs_type.is_string()) {
693  CHECK(kENCODING_DICT == lhs_type.get_compression());
694  auto dictDesc = const_cast<DictDescriptor*>(
695  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
696  CHECK(dictDesc);
697  stringDict = dictDesc->stringDict.get();
698  CHECK(stringDict);
699  }
700 
701  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
702  const auto roffs = frag_offsets[r];
703  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
704  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
705  ScalarTargetValue sv2;
706 
707  // Subtle here is on the two cases of string-to-string assignments, when
708  // upstream passes RHS string as a string index instead of a preferred "real
709  // string".
710  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
711  // index
712  // in this layer, so if upstream passes a str idx here, an
713  // exception is thrown.
714  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
715  // str idx.
716  if (rhs_type.is_string()) {
717  if (const auto vp = boost::get<int64_t>(sv)) {
718  auto dictDesc = const_cast<DictDescriptor*>(
719  catalog->getMetadataForDict(rhs_type.get_comp_param()));
720  if (nullptr == dictDesc) {
721  throw std::runtime_error(
722  "UPDATE does not support cast from string literal to string "
723  "column.");
724  }
725  auto stringDict = dictDesc->stringDict.get();
726  CHECK(stringDict);
727  sv2 = NullableString(stringDict->getString(*vp));
728  sv = &sv2;
729  }
730  }
731 
732  if (const auto vp = boost::get<int64_t>(sv)) {
733  auto v = *vp;
734  if (lhs_type.is_string()) {
735  throw std::runtime_error("UPDATE does not support cast to string.");
736  }
737  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
738  if (lhs_type.is_decimal()) {
739  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
740  int64_t decimal_val;
741  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
742  tabulate_metadata(lhs_type,
743  min_int64t_per_thread[c],
744  max_int64t_per_thread[c],
745  has_null_per_thread[c],
746  (v == inline_int_null_value<int64_t>() &&
747  lhs_type.get_notnull() == false)
748  ? v
749  : decimal_val);
750  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
751  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
752  if (positive_v_and_negative_d || negative_v_and_positive_d) {
753  throw std::runtime_error(
754  "Data conversion overflow on " + std::to_string(v) +
755  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
756  std::to_string(rhs_type.get_scale()) + ") to (" +
757  std::to_string(lhs_type.get_dimension()) + ", " +
758  std::to_string(lhs_type.get_scale()) + ")");
759  }
760  } else if (is_integral(lhs_type)) {
761  if (lhs_type.is_date_in_days()) {
762  // Store meta values in seconds
763  if (lhs_type.get_size() == 2) {
764  nullAwareDateOverflowValidator.validate<int16_t>(v);
765  } else {
766  nullAwareDateOverflowValidator.validate<int32_t>(v);
767  }
768  int64_t days;
769  get_scalar<int64_t>(data_ptr, lhs_type, days);
770  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
771  tabulate_metadata(lhs_type,
772  min_int64t_per_thread[c],
773  max_int64t_per_thread[c],
774  has_null_per_thread[c],
775  (v == inline_int_null_value<int64_t>() &&
776  lhs_type.get_notnull() == false)
777  ? NullSentinelSupplier()(lhs_type, v)
778  : seconds);
779  } else {
780  int64_t target_value;
781  if (rhs_type.is_decimal()) {
782  target_value = round(decimal_to_double(rhs_type, v));
783  } else {
784  target_value = v;
785  }
786  tabulate_metadata(lhs_type,
787  min_int64t_per_thread[c],
788  max_int64t_per_thread[c],
789  has_null_per_thread[c],
790  target_value);
791  }
792  } else {
794  lhs_type,
795  min_double_per_thread[c],
796  max_double_per_thread[c],
797  has_null_per_thread[c],
798  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
799  }
800  } else if (const auto vp = boost::get<double>(sv)) {
801  auto v = *vp;
802  if (lhs_type.is_string()) {
803  throw std::runtime_error("UPDATE does not support cast to string.");
804  }
805  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
806  if (lhs_type.is_integer()) {
807  tabulate_metadata(lhs_type,
808  min_int64t_per_thread[c],
809  max_int64t_per_thread[c],
810  has_null_per_thread[c],
811  int64_t(v));
812  } else if (lhs_type.is_fp()) {
813  tabulate_metadata(lhs_type,
814  min_double_per_thread[c],
815  max_double_per_thread[c],
816  has_null_per_thread[c],
817  double(v));
818  } else {
819  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
820  "LHS with a floating RHS.";
821  }
822  } else if (const auto vp = boost::get<float>(sv)) {
823  auto v = *vp;
824  if (lhs_type.is_string()) {
825  throw std::runtime_error("UPDATE does not support cast to string.");
826  }
827  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
828  if (lhs_type.is_integer()) {
829  tabulate_metadata(lhs_type,
830  min_int64t_per_thread[c],
831  max_int64t_per_thread[c],
832  has_null_per_thread[c],
833  int64_t(v));
834  } else {
835  tabulate_metadata(lhs_type,
836  min_double_per_thread[c],
837  max_double_per_thread[c],
838  has_null_per_thread[c],
839  double(v));
840  }
841  } else if (const auto vp = boost::get<NullableString>(sv)) {
842  const auto s = boost::get<std::string>(vp);
843  const auto sval = s ? *s : std::string("");
844  if (lhs_type.is_string()) {
845  decltype(stringDict->getOrAdd(sval)) sidx;
846  {
847  std::unique_lock<std::mutex> lock(temp_mutex_);
848  sidx = stringDict->getOrAdd(sval);
849  }
850  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
851  tabulate_metadata(lhs_type,
852  min_int64t_per_thread[c],
853  max_int64t_per_thread[c],
854  has_null_per_thread[c],
855  int64_t(sidx));
856  } else if (sval.size() > 0) {
857  auto dval = std::atof(sval.data());
858  if (lhs_type.is_boolean()) {
859  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
860  } else if (lhs_type.is_time()) {
861  throw std::runtime_error(
862  "Date/Time/Timestamp update not supported through translated "
863  "string path.");
864  }
865  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
866  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
867  tabulate_metadata(lhs_type,
868  min_double_per_thread[c],
869  max_double_per_thread[c],
870  has_null_per_thread[c],
871  double(dval));
872  } else {
873  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
874  tabulate_metadata(lhs_type,
875  min_int64t_per_thread[c],
876  max_int64t_per_thread[c],
877  has_null_per_thread[c],
878  int64_t(dval));
879  }
880  } else {
881  put_null(data_ptr, lhs_type, cd->columnName);
882  has_null_per_thread[c] = true;
883  }
884  } else {
885  CHECK(false);
886  }
887  }
888  }));
889  if (threads.size() >= (size_t)cpu_threads()) {
890  wait_cleanup_threads(threads);
891  }
892  }
893  wait_cleanup_threads(threads);
894 
895  // for unit test
897  if (cd->isDeletedCol) {
898  const auto deleted_offsets = getVacuumOffsets(chunk);
899  if (deleted_offsets.size() > 0) {
900  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
901  return;
902  }
903  }
904  }
905  bool has_null_per_chunk{false};
906  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
907  double min_double_per_chunk{std::numeric_limits<double>::max()};
908  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
909  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
910  for (size_t c = 0; c < ncore; ++c) {
911  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
912  max_double_per_chunk =
913  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
914  min_double_per_chunk =
915  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
916  max_int64t_per_chunk =
917  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
918  min_int64t_per_chunk =
919  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
920  }
922  fragment,
923  chunk,
924  has_null_per_chunk,
925  max_double_per_chunk,
926  min_double_per_chunk,
927  max_int64t_per_chunk,
928  min_int64t_per_chunk,
929  cd->columnType,
930  updel_roll);
931 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
std::vector< int > ChunkKey
Definition: types.h:35
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:183
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
HOST DEVICE int get_scale() const
Definition: sqltypes.h:253
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:241
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
int64_t get_epoch_seconds_from_days(const int64_t days)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:182
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3343
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1444
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:256
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:250
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:257
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:399
int cpu_threads()
Definition: thread_count.h:25
bool is_decimal() const
Definition: sqltypes.h:402
std::string columnName
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 78 of file UpdelStorage.cpp.

References updateColumn().

86  {
87  updateColumn(catalog,
88  td,
89  cd,
90  fragment_id,
91  frag_offsets,
92  std::vector<ScalarTargetValue>(1, rhs_value),
93  rhs_type,
94  memory_level,
95  updel_roll);
96 }
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const bool  null,
const double  dmax,
const double  dmin,
const int64_t  lmax,
const int64_t  lmin,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 933 of file UpdelStorage.cpp.

References UpdelRoll::catalog, UpdelRoll::chunkMetadata, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getMetadataForTable(), SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, UpdelRoll::mutex, UpdelRoll::numTuples, Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and ColumnDescriptor::tableId.

Referenced by compactRows(), and updateColumn().

942  {
943  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
944  auto key = std::make_pair(td, &fragment);
945  std::lock_guard<std::mutex> lck(updel_roll.mutex);
946  if (0 == updel_roll.chunkMetadata.count(key)) {
947  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
948  }
949  if (0 == updel_roll.numTuples.count(key)) {
950  updel_roll.numTuples[key] = fragment.shadowNumTuples;
951  }
952  auto& chunkMetadata = updel_roll.chunkMetadata[key];
953 
954  auto buffer = chunk->get_buffer();
955  const auto& lhs_type = cd->columnType;
956 
957  auto update_stats = [& encoder = buffer->encoder](auto min, auto max, auto has_null) {
958  static_assert(std::is_same<decltype(min), decltype(max)>::value,
959  "Type mismatch on min/max");
960  if (has_null) {
961  encoder->updateStats(decltype(min)(), true);
962  }
963  if (max < min) {
964  return;
965  }
966  encoder->updateStats(min, false);
967  encoder->updateStats(max, false);
968  };
969 
970  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
971  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
972  } else if (lhs_type.is_fp()) {
973  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
974  } else if (lhs_type.is_decimal()) {
975  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
976  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
977  has_null_per_chunk);
978  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
979  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
980  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
981  }
982  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
983 
984  // removed as @alex suggests. keep it commented in case of any chance to revisit
985  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
986 }
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:402
std::mutex mutex
Definition: UpdelRoll.h:49
bool is_integral(const SQLTypeInfo &t)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 291 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK(), checked_get(), cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, g_enable_experimental_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), Executor::getExecutor(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, num_rows, Fragmenter_Namespace::InsertData::numRows, Asio::start(), TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

300  {
301  if (!is_feature_enabled<VarlenUpdates>()) {
302  throw std::runtime_error("varlen UPDATE path not enabled.");
303  }
304 
305  updelRoll.is_varlen_update = true;
306  updelRoll.catalog = catalog;
307  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
308  updelRoll.memoryLevel = memoryLevel;
309 
310  size_t num_entries = sourceDataProvider.getEntryCount();
311  size_t num_rows = sourceDataProvider.getRowCount();
312 
313  if (0 == num_rows) {
314  // bail out early
315  return;
316  }
317 
319 
320  auto fragment_ptr = getFragmentInfo(fragmentId);
321  auto& fragment = *fragment_ptr;
322  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
323  get_chunks(catalog, td, fragment, memoryLevel, chunks);
324  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
325  columnDescriptors.size());
326  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
327  std::shared_ptr<Executor> executor;
328 
330  executor = Executor::getExecutor(catalog->getCurrentDB().dbId);
331  }
332 
333  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
334  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
335  auto chunk = chunks[indexOfChunk];
336  const auto chunk_cd = chunk->get_column_desc();
337 
338  if (chunk_cd->isDeletedCol) {
339  deletedChunk = chunk;
340  continue;
341  }
342 
343  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
344  columnDescriptors.end(),
345  [=](const ColumnDescriptor* cd) -> bool {
346  return cd->columnId == chunk_cd->columnId;
347  });
348 
349  if (targetColumnIt != columnDescriptors.end()) {
350  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
351 
352  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
353  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
354 
356  num_rows,
357  *catalog,
358  sourceDataMetaInfo,
359  targetDescriptor,
360  targetDescriptor->columnType,
361  !targetDescriptor->columnType.get_notnull(),
362  sourceDataProvider.getLiteralDictionary(),
364  ? executor->getStringDictionaryProxy(
365  sourceDataMetaInfo.get_type_info().get_comp_param(),
366  executor->getRowSetMemoryOwner(),
367  true)
368  : nullptr};
369  auto converter = factory.create(param);
370  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
371 
372  if (targetDescriptor->columnType.is_geometry()) {
373  // geometry columns are composites
374  // need to skip chunks, depending on geo type
375  switch (targetDescriptor->columnType.get_type()) {
376  case kMULTIPOLYGON:
377  indexOfChunk += 5;
378  break;
379  case kPOLYGON:
380  indexOfChunk += 4;
381  break;
382  case kLINESTRING:
383  indexOfChunk += 2;
384  break;
385  case kPOINT:
386  indexOfChunk += 1;
387  break;
388  default:
389  CHECK(false); // not supported
390  }
391  }
392  } else {
393  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
394  std::unique_ptr<ChunkToInsertDataConverter> converter;
395 
396  if (chunk_cd->columnType.is_fixlen_array()) {
397  converter =
398  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
399  } else if (chunk_cd->columnType.is_string()) {
400  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
401  } else if (chunk_cd->columnType.is_geometry()) {
402  // the logical geo column is a string column
403  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
404  } else {
405  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
406  }
407 
408  chunkConverters.push_back(std::move(converter));
409 
410  } else if (chunk_cd->columnType.is_date_in_days()) {
411  /* Q: Why do we need this?
412  A: In variable length updates path we move the chunk content of column
413  without decoding. Since it again passes through DateDaysEncoder
414  the expected value should be in seconds, but here it will be in days.
415  Therefore, using DateChunkConverter chunk values are being scaled to
416  seconds which then ultimately encoded in days in DateDaysEncoder.
417  */
418  std::unique_ptr<ChunkToInsertDataConverter> converter;
419  const size_t physical_size = chunk_cd->columnType.get_size();
420  if (physical_size == 2) {
421  converter =
422  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
423  } else if (physical_size == 4) {
424  converter =
425  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
426  } else {
427  CHECK(false);
428  }
429  chunkConverters.push_back(std::move(converter));
430  } else {
431  std::unique_ptr<ChunkToInsertDataConverter> converter;
432  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
433  int logical_size = logical_type.get_size();
434  int physical_size = chunk_cd->columnType.get_size();
435 
436  if (logical_type.is_string()) {
437  // for dicts -> logical = physical
438  logical_size = physical_size;
439  }
440 
441  if (8 == physical_size) {
442  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
443  num_rows, chunk.get());
444  } else if (4 == physical_size) {
445  if (8 == logical_size) {
446  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
447  num_rows, chunk.get());
448  } else {
449  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
450  num_rows, chunk.get());
451  }
452  } else if (2 == chunk_cd->columnType.get_size()) {
453  if (8 == logical_size) {
454  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
455  num_rows, chunk.get());
456  } else if (4 == logical_size) {
457  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
458  num_rows, chunk.get());
459  } else {
460  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
461  num_rows, chunk.get());
462  }
463  } else if (1 == chunk_cd->columnType.get_size()) {
464  if (8 == logical_size) {
465  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
466  num_rows, chunk.get());
467  } else if (4 == logical_size) {
468  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
469  num_rows, chunk.get());
470  } else if (2 == logical_size) {
471  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
472  num_rows, chunk.get());
473  } else {
474  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
475  num_rows, chunk.get());
476  }
477  } else {
478  CHECK(false); // unknown
479  }
480 
481  chunkConverters.push_back(std::move(converter));
482  }
483  }
484  }
485 
486  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
487  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
488 
489  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
490  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
491  deletedChunk->get_column_desc()->tableId,
492  deletedChunk->get_column_desc()->columnId,
493  fragment.fragmentId};
494  updelRoll.dirtyChunkeys.insert(chunkey);
495  bool* deletedChunkBuffer =
496  reinterpret_cast<bool*>(deletedChunk->get_buffer()->getMemoryPtr());
497 
498  std::atomic<size_t> row_idx{0};
499 
500  auto row_converter = [&sourceDataProvider,
501  &sourceDataConverters,
502  &indexOffFragmentOffsetColumn,
503  &chunkConverters,
504  &deletedChunkBuffer,
505  &row_idx](size_t indexOfEntry) -> void {
506  // convert the source data
507  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
508  if (row.empty()) {
509  return;
510  }
511 
512  size_t indexOfRow = row_idx.fetch_add(1);
513 
514  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
515  if (sourceDataConverters[col]) {
516  const auto& mapd_variant = row[col];
517  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
518  }
519  }
520 
521  auto scalar = checked_get(
522  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
523  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
524 
525  // convert the remaining chunks
526  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
527  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
528  }
529 
530  // now mark the row as deleted
531  deletedChunkBuffer[indexInChunkBuffer] = true;
532  };
533 
534  bool can_go_parallel = num_rows > 20000;
535 
536  if (can_go_parallel) {
537  const size_t num_worker_threads = cpu_threads();
538  std::vector<std::future<void>> worker_threads;
539  for (size_t i = 0,
540  start_entry = 0,
541  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
542  i < num_worker_threads && start_entry < num_entries;
543  ++i, start_entry += stride) {
544  const auto end_entry = std::min(start_entry + stride, num_rows);
545  worker_threads.push_back(std::async(
546  std::launch::async,
547  [&row_converter](const size_t start, const size_t end) {
548  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
549  row_converter(indexOfRow);
550  }
551  },
552  start_entry,
553  end_entry));
554  }
555 
556  for (auto& child : worker_threads) {
557  child.wait();
558  }
559 
560  } else {
561  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
562  row_converter(entryIdx);
563  }
564  }
565 
567  insert_data.databaseId = catalog->getCurrentDB().dbId;
568  insert_data.tableId = td->tableId;
569 
570  for (size_t i = 0; i < chunkConverters.size(); i++) {
571  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
572  continue;
573  }
574 
575  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
576  if (sourceDataConverters[i]) {
577  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
578  }
579  continue;
580  }
581 
582  insert_data.numRows = num_rows;
583  insertDataNoCheckpoint(insert_data);
584 
585  // update metdata
586  if (!deletedChunk->get_buffer()->has_encoder) {
587  deletedChunk->init_encoder();
588  }
589  deletedChunk->get_buffer()->encoder->updateStats(static_cast<int64_t>(true), false);
590 
591  auto& shadowDeletedChunkMeta =
592  fragment.shadowChunkMetadataMap[deletedChunk->get_column_desc()->columnId];
593  if (shadowDeletedChunkMeta.numElements >
594  deletedChunk->get_buffer()->encoder->getNumElems()) {
595  // the append will have populated shadow meta data, otherwise use existing num
596  // elements
597  deletedChunk->get_buffer()->encoder->setNumElems(shadowDeletedChunkMeta.numElements);
598  }
599  deletedChunk->get_buffer()->setUpdated();
600 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
bool is_varlen_update
Definition: UpdelRoll.h:67
std::vector< int > ChunkKey
Definition: types.h:35
HOST DEVICE int get_size() const
Definition: sqltypes.h:258
const int8_t const int64_t * num_rows
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters())
Definition: Execute.cpp:130
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:796
void start()
Definition: Asio.cpp:33
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:182
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3343
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
bool g_enable_experimental_string_functions
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:64
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool is_string() const
Definition: sqltypes.h:399
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:25
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 988 of file UpdelStorage.cpp.

References UpdelRoll::chunkMetadata, fragmentInfoMutex_, and UpdelRoll::numTuples.

990  {
991  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
992  if (updel_roll.chunkMetadata.count(key)) {
993  auto& fragmentInfo = *key.second;
994  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
995  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
996  fragmentInfo.setChunkMetadataMap(chunkMetadata);
997  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
998  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
999  // TODO(ppan): When fragment-level compaction is enable, the following code
1000  // should suffice. When not (ie. existing code), we'll revert to update
1001  // InsertOrderFragmenter::varLenColInfo_
1002  /*
1003  for (const auto cit : chunkMetadata) {
1004  const auto& cd = *catalog->getMetadataForColumn(td->tableId, cit.first);
1005  if (cd.columnType.get_size() < 0)
1006  fragmentInfo.varLenColInfox[cd.columnId] = cit.second.numBytes;
1007  }
1008  */
1009  }
1010 }
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1110 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1113  {
1114  const auto cd = chunk->get_column_desc();
1115  const auto& col_type = cd->columnType;
1116  auto data_buffer = chunk->get_buffer();
1117  auto data_addr = data_buffer->getMemoryPtr();
1118  auto element_size =
1119  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1120  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1121  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1122  size_t nbytes_fix_data_to_keep = 0;
1123  auto nrows_to_vacuum = frag_offsets.size();
1124  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1125  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1126  auto is_last_one = irow == nrows_to_vacuum;
1127  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1128  auto maddr_to_vacuum = data_addr;
1129  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1130  if (nrows_to_keep > 0) {
1131  auto nbytes_to_keep = nrows_to_keep * element_size;
1132  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1133  // move curr fixlen row block toward front
1134  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1135  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1136  nbytes_to_keep);
1137  }
1138  irow_of_blk_to_fill += nrows_to_keep;
1139  nbytes_fix_data_to_keep += nbytes_to_keep;
1140  }
1141  irow_of_blk_to_keep = irow_to_vacuum + 1;
1142  }
1143  return nbytes_fix_data_to_keep;
1144 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1146 of file UpdelStorage.cpp.

References Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1149  {
1150  auto data_buffer = chunk->get_buffer();
1151  auto index_buffer = chunk->get_index_buf();
1152  auto data_addr = data_buffer->getMemoryPtr();
1153  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1154  auto index_array = (StringOffsetT*)indices_addr;
1155  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1156  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1157  size_t nbytes_fix_data_to_keep = 0;
1158  size_t nbytes_var_data_to_keep = 0;
1159  auto nrows_to_vacuum = frag_offsets.size();
1160  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1161  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1162  auto is_last_one = irow == nrows_to_vacuum;
1163  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1164  auto maddr_to_vacuum = data_addr;
1165  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1166  if (nrows_to_keep > 0) {
1167  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1168  auto nbytes_to_keep =
1169  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1170  index_array[irow_of_blk_to_keep];
1171  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1172  // move curr varlen row block toward front
1173  memmove(data_addr + ibyte_var_data_to_keep,
1174  data_addr + index_array[irow_of_blk_to_keep],
1175  nbytes_to_keep);
1176 
1177  const auto index_base = index_array[irow_of_blk_to_keep];
1178  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1179  auto& index = index_array[irow_of_blk_to_keep + i];
1180  index = ibyte_var_data_to_keep + (index - index_base);
1181  }
1182  }
1183  nbytes_var_data_to_keep += nbytes_to_keep;
1184  maddr_to_vacuum = indices_addr;
1185 
1186  constexpr static auto index_element_size = sizeof(StringOffsetT);
1187  nbytes_to_keep = nrows_to_keep * index_element_size;
1188  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1189  // move curr fixlen row block toward front
1190  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1191  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1192  nbytes_to_keep);
1193  }
1194  irow_of_blk_to_fill += nrows_to_keep;
1195  nbytes_fix_data_to_keep += nbytes_to_keep;
1196  }
1197  irow_of_blk_to_keep = irow_to_vacuum + 1;
1198  }
1199  return nbytes_var_data_to_keep;
1200 }
int32_t StringOffsetT
Definition: sqltypes.h:839

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 183 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 185 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 189 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 206 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 200 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 202 of file InsertOrderFragmenter.h.

Referenced by updateMetadata().

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 187 of file InsertOrderFragmenter.h.

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 208 of file InsertOrderFragmenter.h.

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 204 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 198 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 193 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 211 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 196 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 194 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 209 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 192 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 234 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 207 of file InsertOrderFragmenter.h.

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 210 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: