OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Fragmenter_Namespace::InsertOrderFragmenter Class Reference

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter. More...

#include <InsertOrderFragmenter.h>

+ Inheritance diagram for Fragmenter_Namespace::InsertOrderFragmenter:
+ Collaboration diagram for Fragmenter_Namespace::InsertOrderFragmenter:

Public Types

using ModifyTransactionTracker = UpdelRoll
 

Public Member Functions

 InsertOrderFragmenter (const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
 
 ~InsertOrderFragmenter () override
 
TableInfo getFragmentsForQuery () override
 returns (inside QueryInfo) object all ids and row sizes of fragments More...
 
void insertData (InsertData &insertDataStruct) override
 appends data onto the most recently occuring fragment, creating a new one if necessary More...
 
void insertDataNoCheckpoint (InsertData &insertDataStruct) override
 Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally. More...
 
void dropFragmentsToSize (const size_t maxRows) override
 Will truncate table to less than maxRows by dropping fragments. More...
 
void updateChunkStats (const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map) override
 Update chunk stats. More...
 
FragmentInfogetFragmentInfo (const int fragment_id) const override
 Retrieve the fragment info object for an individual fragment for editing. More...
 
int getFragmenterId () override
 get fragmenter's id More...
 
std::vector< int > getChunkKeyPrefix () const
 
std::string getFragmenterType () override
 get fragmenter's type (as string More...
 
size_t getNumRows () override
 
void setNumRows (const size_t numTuples) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumns (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
 
void updateColumn (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const ScalarTargetValue &rhs_value, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
void updateColumnMetadata (const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
 
void updateMetadata (const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
 
void compactRows (const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
 
const std::vector< uint64_t > getVacuumOffsets (const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
 
auto getChunksForAllColumns (const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
 
void dropColumns (const std::vector< int > &columnIds) override
 
bool hasDeletedRows (const int delete_column_id) override
 Iterates through chunk metadata to return whether any rows have been deleted. More...
 
- Public Member Functions inherited from Fragmenter_Namespace::AbstractFragmenter
virtual ~AbstractFragmenter ()
 

Static Public Member Functions

static void updateColumn (const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
 

Protected Member Functions

FragmentInfocreateNewFragment (const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
 creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table. More...
 
void deleteFragments (const std::vector< int > &dropFragIds)
 
void getChunkMetadata ()
 
void lockInsertCheckpointData (const InsertData &insertDataStruct)
 
void insertDataImpl (InsertData &insertDataStruct)
 
void replicateData (const InsertData &insertDataStruct)
 
 InsertOrderFragmenter (const InsertOrderFragmenter &)
 
InsertOrderFragmenteroperator= (const InsertOrderFragmenter &)
 
FragmentInfogetFragmentInfoFromId (const int fragment_id)
 
auto vacuum_fixlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 
auto vacuum_varlen_rows (const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
 

Protected Attributes

std::vector< int > chunkKeyPrefix_
 
std::map< int, Chunk_NS::ChunkcolumnMap_
 
std::deque< std::unique_ptr
< FragmentInfo > > 
fragmentInfoVec_
 
Data_Namespace::DataMgrdataMgr_
 
Catalog_Namespace::Catalogcatalog_
 
const int physicalTableId_
 
const int shard_
 
size_t maxFragmentRows_
 
size_t pageSize_
 
size_t numTuples_
 
int maxFragmentId_
 
size_t maxChunkSize_
 
size_t maxRows_
 
std::string fragmenterType_
 
mapd_shared_mutex fragmentInfoMutex_
 
mapd_shared_mutex insertMutex_
 
Data_Namespace::MemoryLevel defaultInsertLevel_
 
const bool uses_foreign_storage_
 
bool hasMaterializedRowId_
 
int rowIdColId_
 
std::unordered_map< int, size_t > varLenColInfo_
 
std::shared_ptr< std::mutex > mutex_access_inmem_states
 
std::mutex temp_mutex_
 

Detailed Description

The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order. Likely the default fragmenter.

InsertOrderFragmenter

Definition at line 52 of file InsertOrderFragmenter.h.

Member Typedef Documentation

Constructor & Destructor Documentation

Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const std::vector< int >  chunkKeyPrefix,
std::vector< Chunk_NS::Chunk > &  chunkVec,
Data_Namespace::DataMgr dataMgr,
Catalog_Namespace::Catalog catalog,
const int  physicalTableId,
const int  shard,
const size_t  maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
const size_t  maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
const size_t  pageSize = DEFAULT_PAGE_SIZE,
const size_t  maxRows = DEFAULT_MAX_ROWS,
const Data_Namespace::MemoryLevel  defaultInsertLevel = Data_Namespace::DISK_LEVEL,
const bool  uses_foreign_storage = false 
)
Fragmenter_Namespace::InsertOrderFragmenter::~InsertOrderFragmenter ( )
override

Definition at line 92 of file InsertOrderFragmenter.cpp.

92 {}
Fragmenter_Namespace::InsertOrderFragmenter::InsertOrderFragmenter ( const InsertOrderFragmenter )
protected

Member Function Documentation

void Fragmenter_Namespace::InsertOrderFragmenter::compactRows ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1184 of file UpdelStorage.cpp.

References CHECK(), cpu_threads(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), getChunksForAllColumns(), getFragmentInfo(), UpdelRoll::numTuples, Fragmenter_Namespace::set_chunk_metadata(), Fragmenter_Namespace::set_chunk_stats(), updateColumnMetadata(), vacuum_fixlen_rows(), vacuum_varlen_rows(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1189  {
1190  auto fragment_ptr = getFragmentInfo(fragment_id);
1191  auto& fragment = *fragment_ptr;
1192  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1193  const auto ncol = chunks.size();
1194 
1195  std::vector<int8_t> has_null_per_thread(ncol, 0);
1196  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1197  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1198  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1199  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1200 
1201  // parallel delete columns
1202  std::vector<std::future<void>> threads;
1203  auto nrows_to_vacuum = frag_offsets.size();
1204  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1205  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1206 
1207  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1208  auto chunk = chunks[ci];
1209  const auto cd = chunk->getColumnDesc();
1210  const auto& col_type = cd->columnType;
1211  auto data_buffer = chunk->getBuffer();
1212  auto index_buffer = chunk->getIndexBuf();
1213  auto data_addr = data_buffer->getMemoryPtr();
1214  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1215  auto index_array = (StringOffsetT*)indices_addr;
1216  bool is_varlen = col_type.is_varlen_indeed();
1217 
1218  auto fixlen_vacuum = [=,
1219  &has_null_per_thread,
1220  &max_double_per_thread,
1221  &min_double_per_thread,
1222  &min_int64t_per_thread,
1223  &max_int64t_per_thread,
1224  &updel_roll,
1225  &frag_offsets,
1226  &fragment] {
1227  size_t nbytes_fix_data_to_keep;
1228  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1229 
1230  data_buffer->encoder->setNumElems(nrows_to_keep);
1231  data_buffer->setSize(nbytes_fix_data_to_keep);
1232  data_buffer->setUpdated();
1233 
1234  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1235 
1236  auto daddr = data_addr;
1237  auto element_size =
1238  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1239  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1240  if (col_type.is_fixlen_array()) {
1241  auto encoder =
1242  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1243  CHECK(encoder);
1244  encoder->updateMetadata((int8_t*)daddr);
1245  } else if (col_type.is_fp()) {
1246  set_chunk_stats(col_type,
1247  data_addr,
1248  has_null_per_thread[ci],
1249  min_double_per_thread[ci],
1250  max_double_per_thread[ci]);
1251  } else {
1252  set_chunk_stats(col_type,
1253  data_addr,
1254  has_null_per_thread[ci],
1255  min_int64t_per_thread[ci],
1256  max_int64t_per_thread[ci]);
1257  }
1258  }
1259  };
1260 
1261  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1262  size_t nbytes_var_data_to_keep;
1263  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1264 
1265  data_buffer->encoder->setNumElems(nrows_to_keep);
1266  data_buffer->setSize(nbytes_var_data_to_keep);
1267  data_buffer->setUpdated();
1268 
1269  index_array[nrows_to_keep] = data_buffer->size();
1270  index_buffer->setSize(sizeof(*index_array) *
1271  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1272  index_buffer->setUpdated();
1273 
1274  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1275  };
1276 
1277  if (is_varlen) {
1278  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1279  } else {
1280  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1281  }
1282  if (threads.size() >= (size_t)cpu_threads()) {
1283  wait_cleanup_threads(threads);
1284  }
1285  }
1286 
1287  wait_cleanup_threads(threads);
1288 
1289  auto key = std::make_pair(td, &fragment);
1290  updel_roll.numTuples[key] = nrows_to_keep;
1291  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1292  auto chunk = chunks[ci];
1293  auto cd = chunk->getColumnDesc();
1294  if (!cd->columnType.is_fixlen_array()) {
1296  fragment,
1297  chunk,
1298  has_null_per_thread[ci],
1299  max_double_per_thread[ci],
1300  min_double_per_thread[ci],
1301  max_int64t_per_thread[ci],
1302  min_int64t_per_thread[ci],
1303  cd->columnType,
1304  updel_roll);
1305  }
1306  }
1307 }
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
int32_t StringOffsetT
Definition: sqltypes.h:866
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
CHECK(cgen_state)
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
int cpu_threads()
Definition: thread_count.h:25
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::createNewFragment ( const Data_Namespace::MemoryLevel  memory_level = Data_Namespace::DISK_LEVEL)
protected

creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column of the table.

Also unpins the chunks of the previous insert buffer

Definition at line 680 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), and Fragmenter_Namespace::FragmentInfo::fragmentId.

681  {
682  // also sets the new fragment as the insertBuffer for each column
683 
684  maxFragmentId_++;
685  auto newFragmentInfo = std::make_unique<FragmentInfo>();
686  newFragmentInfo->fragmentId = maxFragmentId_;
687  newFragmentInfo->shadowNumTuples = 0;
688  newFragmentInfo->setPhysicalNumTuples(0);
689  for (const auto levelSize : dataMgr_->levelSizes_) {
690  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
691  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
692  }
693  newFragmentInfo->physicalTableId = physicalTableId_;
694  newFragmentInfo->shard = shard_;
695 
696  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
697  colMapIt != columnMap_.end();
698  ++colMapIt) {
699  ChunkKey chunkKey = chunkKeyPrefix_;
700  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
701  chunkKey.push_back(maxFragmentId_);
702  colMapIt->second.createChunkBuffer(
703  dataMgr_,
704  chunkKey,
705  memoryLevel,
706  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
707  pageSize_);
708  colMapIt->second.initEncoder();
709  }
710 
711  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
712  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
713  return fragmentInfoVec_.back().get();
714 }
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< int > levelSizes_
Definition: DataMgr.h:211
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::deleteFragments ( const std::vector< int > &  dropFragIds)
protected

Definition at line 229 of file InsertOrderFragmenter.cpp.

References catalog_(), and lockmgr::TableLockMgrImpl< TableDataLockMgr >::getWriteLockForTable().

229  {
230  // Fix a verified loophole on sharded logical table which is locked using logical
231  // tableId while it's its physical tables that can come here when fragments overflow
232  // during COPY. Locks on a logical table and its physical tables never intersect, which
233  // means potential races. It'll be an overkill to resolve a logical table to physical
234  // tables in DBHandler, ParseNode or other higher layers where the logical table is
235  // locked with Table Read/Write locks; it's easier to lock the logical table of its
236  // physical tables. A downside of this approach may be loss of parallel execution of
237  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
238  // operation, the loss seems not a big deal.
239  auto chunkKeyPrefix = chunkKeyPrefix_;
240  if (shard_ >= 0) {
241  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
242  }
243 
244  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
245  // SELECT and COPY may enter a deadlock
246  const auto delete_lock =
248 
249  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
250 
251  for (const auto fragId : dropFragIds) {
252  for (const auto& col : columnMap_) {
253  int colId = col.first;
254  vector<int> fragPrefix = chunkKeyPrefix_;
255  fragPrefix.push_back(colId);
256  fragPrefix.push_back(fragId);
257  dataMgr_->deleteChunksWithPrefix(fragPrefix);
258  }
259  }
260 }
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3584
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:428
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::dropColumns ( const std::vector< int > &  columnIds)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 475 of file InsertOrderFragmenter.cpp.

475  {
476  // prevent concurrent insert rows and drop column
477  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
478  // synchronize concurrent accesses to fragmentInfoVec_
479  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
480  for (auto const& fragmentInfo : fragmentInfoVec_) {
481  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
482  }
483 
484  for (const auto columnId : columnIds) {
485  auto cit = columnMap_.find(columnId);
486  if (columnMap_.end() != cit) {
487  columnMap_.erase(cit);
488  }
489 
490  vector<int> fragPrefix = chunkKeyPrefix_;
491  fragPrefix.push_back(columnId);
492  dataMgr_->deleteChunksWithPrefix(fragPrefix);
493 
494  for (const auto& fragmentInfo : fragmentInfoVec_) {
495  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
496  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
497  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
498  }
499  }
500  }
501  for (const auto& fragmentInfo : fragmentInfoVec_) {
502  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
503  }
504 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:428
std::map< int, Chunk_NS::Chunk > columnMap_
void Fragmenter_Namespace::InsertOrderFragmenter::dropFragmentsToSize ( const size_t  maxRows)
overridevirtual

Will truncate table to less than maxRows by dropping fragments.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 202 of file InsertOrderFragmenter.cpp.

References CHECK_GE, CHECK_GT, DROP_FRAGMENT_FACTOR, logger::INFO, and LOG.

202  {
203  // not safe to call from outside insertData
204  // b/c depends on insertLock around numTuples_
205 
206  // don't ever drop the only fragment!
207  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
208  return;
209  }
210 
211  if (numTuples_ > maxRows) {
212  size_t preNumTuples = numTuples_;
213  vector<int> dropFragIds;
214  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
215  while (numTuples_ > targetRows) {
216  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
217  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
218  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
219  fragmentInfoVec_.pop_front();
220  CHECK_GE(numTuples_, numFragTuples);
221  numTuples_ -= numFragTuples;
222  }
223  deleteFragments(dropFragIds);
224  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
225  << " post: " << numTuples_ << " maxRows: " << maxRows;
226  }
227 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:188
#define CHECK_GE(x, y)
Definition: Logger.h:210
#define CHECK_GT(x, y)
Definition: Logger.h:209
void deleteFragments(const std::vector< int > &dropFragIds)
#define DROP_FRAGMENT_FACTOR
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::getChunkKeyPrefix ( ) const
inline

Definition at line 103 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

void Fragmenter_Namespace::InsertOrderFragmenter::getChunkMetadata ( )
protected

Definition at line 112 of file InsertOrderFragmenter.cpp.

References CHECK(), Fragmenter_Namespace::anonymous_namespace{InsertOrderFragmenter.cpp}::compute_device_for_fragment(), Data_Namespace::DISK_LEVEL, logger::FATAL, LOG, and to_string().

112  {
113  if (uses_foreign_storage_ ||
115  // memory-resident tables won't have anything on disk
116  ChunkMetadataVector chunk_metadata;
118 
119  // data comes like this - database_id, table_id, column_id, fragment_id
120  // but lets sort by database_id, table_id, fragment_id, column_id
121 
122  int fragment_subkey_index = 3;
123  std::sort(chunk_metadata.begin(),
124  chunk_metadata.end(),
125  [&](const auto& pair1, const auto& pair2) {
126  return pair1.first[3] < pair2.first[3];
127  });
128 
129  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
130  ++chunk_itr) {
131  int cur_column_id = chunk_itr->first[2];
132  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
133 
134  if (fragmentInfoVec_.empty() ||
135  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
136  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
137  CHECK(new_fragment_info);
138  maxFragmentId_ = cur_fragment_id;
139  new_fragment_info->fragmentId = cur_fragment_id;
140  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
141  numTuples_ += new_fragment_info->getPhysicalNumTuples();
142  for (const auto level_size : dataMgr_->levelSizes_) {
143  new_fragment_info->deviceIds.push_back(
144  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
145  }
146  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
147  new_fragment_info->physicalTableId = physicalTableId_;
148  new_fragment_info->shard = shard_;
149  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
150  } else {
151  if (chunk_itr->second->numElements !=
152  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
153  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
154  std::to_string(physicalTableId_) + ", Column " +
155  std::to_string(cur_column_id) + ". Fragment Tuples: " +
157  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
158  ", Chunk Tuples: " +
159  std::to_string(chunk_itr->second->numElements);
160  }
161  }
162  CHECK(fragmentInfoVec_.back().get());
163  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
164  }
165  }
166 
167  ssize_t maxFixedColSize = 0;
168 
169  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
170  ssize_t size = colIt->second.getColumnDesc()->columnType.get_size();
171  if (size == -1) { // variable length
172  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
173  size = 8; // b/c we use this for string and array indices - gross to have magic
174  // number here
175  }
176  maxFixedColSize = std::max(maxFixedColSize, size);
177  }
178 
179  // this is maximum number of rows assuming everything is fixed length
180  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
181 
182  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
183  // Now need to get the insert buffers for each column - should be last
184  // fragment
185  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
186  // TODO: add accessor here for safe indexing
187  int deviceId =
188  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
189  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
190  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
191  insertKey.push_back(colIt->first); // column id
192  insertKey.push_back(lastFragmentId); // fragment id
193  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
194  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
195  if (varLenColInfoIt != varLenColInfo_.end()) {
196  varLenColInfoIt->second = colIt->second.getBuffer()->size();
197  }
198  }
199  }
200 }
std::vector< int > ChunkKey
Definition: types.h:35
std::vector< int > levelSizes_
Definition: DataMgr.h:211
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
#define LOG(tag)
Definition: Logger.h:188
std::string to_string(char const *&&v)
CHECK(cgen_state)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:403
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)

+ Here is the call graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::getChunksForAllColumns ( const TableDescriptor td,
const FragmentInfo fragment,
const Data_Namespace::MemoryLevel  memory_level 
)

Definition at line 994 of file UpdelStorage.cpp.

References catalog_, CHECK(), Catalog_Namespace::DBMetadata::dbId, Fragmenter_Namespace::FragmentInfo::fragmentId, Chunk_NS::Chunk::getChunk(), Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getMetadataForColumn(), TableDescriptor::nColumns, and TableDescriptor::tableId.

Referenced by compactRows().

997  {
998  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
999  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1000  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1001  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1002  ++ncol;
1003  if (!cd->isVirtualCol) {
1004  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1005  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1006  ChunkKey chunk_key{
1007  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1008  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1009  &catalog_->getDataMgr(),
1010  chunk_key,
1011  memory_level,
1012  0,
1013  chunk_meta_it->second->numBytes,
1014  chunk_meta_it->second->numElements);
1015  chunks.push_back(chunk);
1016  }
1017  }
1018  }
1019  return chunks;
1020 }
std::vector< int > ChunkKey
Definition: types.h:35
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterId ( )
inlineoverridevirtual

get fragmenter's id

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 102 of file InsertOrderFragmenter.h.

References chunkKeyPrefix_.

102 { return chunkKeyPrefix_.back(); }
std::string Fragmenter_Namespace::InsertOrderFragmenter::getFragmenterType ( )
inlineoverridevirtual

get fragmenter's type (as string

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 107 of file InsertOrderFragmenter.h.

References fragmenterType_.

FragmentInfo * Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfo ( const int  fragment_id) const
overridevirtual

Retrieve the fragment info object for an individual fragment for editing.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 352 of file InsertOrderFragmenter.cpp.

References CHECK().

Referenced by compactRows(), updateColumn(), and updateColumns().

352  {
353  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
354  fragmentInfoVec_.end(),
355  [fragment_id](const auto& fragment) -> bool {
356  return fragment->fragmentId == fragment_id;
357  });
358  CHECK(fragment_it != fragmentInfoVec_.end());
359  return fragment_it->get();
360 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

FragmentInfo& Fragmenter_Namespace::InsertOrderFragmenter::getFragmentInfoFromId ( const int  fragment_id)
protected
TableInfo Fragmenter_Namespace::InsertOrderFragmenter::getFragmentsForQuery ( )
overridevirtual

returns (inside QueryInfo) object all ids and row sizes of fragments

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 716 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::TableInfo::chunkKeyPrefix, Fragmenter_Namespace::FragmentInfo::deviceIds, Fragmenter_Namespace::FragmentInfo::fragmentId, Fragmenter_Namespace::TableInfo::fragments, Fragmenter_Namespace::TableInfo::getPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::physicalTableId, Fragmenter_Namespace::FragmentInfo::setPhysicalNumTuples(), Fragmenter_Namespace::TableInfo::setPhysicalNumTuples(), Fragmenter_Namespace::FragmentInfo::shadowNumTuples, and Fragmenter_Namespace::FragmentInfo::shard.

716  {
717  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
718  TableInfo queryInfo;
719  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
720  // right now we don't test predicate, so just return (copy of) all fragments
721  bool fragmentsExist = false;
722  if (fragmentInfoVec_.empty()) {
723  // If we have no fragments add a dummy empty fragment to make the executor
724  // not have separate logic for 0-row tables
725  int maxFragmentId = 0;
726  FragmentInfo emptyFragmentInfo;
727  emptyFragmentInfo.fragmentId = maxFragmentId;
728  emptyFragmentInfo.shadowNumTuples = 0;
729  emptyFragmentInfo.setPhysicalNumTuples(0);
730  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
731  emptyFragmentInfo.physicalTableId = physicalTableId_;
732  emptyFragmentInfo.shard = shard_;
733  queryInfo.fragments.push_back(emptyFragmentInfo);
734  } else {
735  fragmentsExist = true;
736  std::for_each(
737  fragmentInfoVec_.begin(),
738  fragmentInfoVec_.end(),
739  [&queryInfo](const auto& fragment_owned_ptr) {
740  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
741  });
742  }
743  readLock.unlock();
744  queryInfo.setPhysicalNumTuples(0);
745  auto partIt = queryInfo.fragments.begin();
746  if (fragmentsExist) {
747  while (partIt != queryInfo.fragments.end()) {
748  if (partIt->getPhysicalNumTuples() == 0) {
749  // this means that a concurrent insert query inserted tuples into a new fragment
750  // but when the query came in we didn't have this fragment. To make sure we don't
751  // mess up the executor we delete this fragment from the metadatamap (fixes
752  // earlier bug found 2015-05-08)
753  partIt = queryInfo.fragments.erase(partIt);
754  } else {
755  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
756  partIt->getPhysicalNumTuples());
757  ++partIt;
758  }
759  }
760  } else {
761  // We added a dummy fragment and know the table is empty
762  queryInfo.setPhysicalNumTuples(0);
763  }
764  return queryInfo;
765 }
std::vector< int > levelSizes_
Definition: DataMgr.h:211
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_

+ Here is the call graph for this function:

size_t Fragmenter_Namespace::InsertOrderFragmenter::getNumRows ( )
inlineoverridevirtual
const std::vector< uint64_t > Fragmenter_Namespace::InsertOrderFragmenter::getVacuumOffsets ( const std::shared_ptr< Chunk_NS::Chunk > &  chunk)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 1023 of file UpdelStorage.cpp.

References CHECK(), cpu_threads(), and Fragmenter_Namespace::wait_cleanup_threads().

Referenced by updateColumn().

1024  {
1025  const auto data_buffer = chunk->getBuffer();
1026  const auto data_addr = data_buffer->getMemoryPtr();
1027  const size_t nrows_in_chunk = data_buffer->size();
1028  const size_t ncore = cpu_threads();
1029  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1030  std::vector<std::vector<uint64_t>> deleted_offsets;
1031  deleted_offsets.resize(ncore);
1032  std::vector<std::future<void>> threads;
1033  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1034  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1035  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1036  const auto ithread = rbegin / segsz;
1037  CHECK(ithread < deleted_offsets.size());
1038  deleted_offsets[ithread].reserve(segsz);
1039  for (size_t r = rbegin; r < rend; ++r) {
1040  if (data_addr[r]) {
1041  deleted_offsets[ithread].push_back(r);
1042  }
1043  }
1044  }));
1045  }
1046  wait_cleanup_threads(threads);
1047  std::vector<uint64_t> all_deleted_offsets;
1048  for (size_t i = 0; i < ncore; ++i) {
1049  all_deleted_offsets.insert(
1050  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1051  }
1052  return all_deleted_offsets;
1053 }
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
CHECK(cgen_state)
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool Fragmenter_Namespace::InsertOrderFragmenter::hasDeletedRows ( const int  delete_column_id)
overridevirtual

Iterates through chunk metadata to return whether any rows have been deleted.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 506 of file InsertOrderFragmenter.cpp.

References CHECK().

506  {
507  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
508 
509  for (auto const& fragment : fragmentInfoVec_) {
510  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
511  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
512  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
513  if (chunk_stats.max.tinyintval == 1) {
514  return true;
515  }
516  }
517  return false;
518 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
CHECK(cgen_state)
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertData ( InsertData insertDataStruct)
overridevirtual

appends data onto the most recently occuring fragment, creating a new one if necessary

Todo:
be able to fill up current fragment in multi-row insert before creating new fragment

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 362 of file InsertOrderFragmenter.cpp.

References catalog_(), Fragmenter_Namespace::InsertData::databaseId, Data_Namespace::DISK_LEVEL, and Fragmenter_Namespace::InsertData::tableId.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertData().

362  {
363  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
364  try {
365  mapd_unique_lock<mapd_shared_mutex> insertLock(
366  insertMutex_); // prevent two threads from trying to insert into the same table
367  // simultaneously
368 
369  insertDataImpl(insertDataStruct);
370 
371  if (defaultInsertLevel_ ==
372  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
374  chunkKeyPrefix_[0],
375  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
376  }
377  } catch (...) {
378  int32_t tableEpoch =
379  catalog_->getTableEpoch(insertDataStruct.databaseId, insertDataStruct.tableId);
380 
381  // the statement below deletes *this* object!
382  // relying on exception propagation at this stage
383  // until we can sort this out in a cleaner fashion
385  insertDataStruct.databaseId, insertDataStruct.tableId, tableEpoch);
386  throw;
387  }
388 }
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:481
void insertDataImpl(InsertData &insertDataStruct)
int32_t getTableEpoch(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:2688
void setTableEpoch(const int db_id, const int table_id, const int new_epoch)
Definition: Catalog.cpp:2718

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataImpl ( InsertData insertDataStruct)
protected

Definition at line 520 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK(), CHECK_GT, CHECK_LE, Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, DataBlockPtr::numbersPtr, Fragmenter_Namespace::InsertData::numRows, and Fragmenter_Namespace::InsertData::replicate_count.

520  {
521  // populate deleted system column if it should exists, as it will not come from client
522  // Do not add this magical column in the replicate ALTER TABLE ADD route as
523  // it is not needed and will cause issues
524  std::unique_ptr<int8_t[]> data_for_deleted_column;
525  for (const auto& cit : columnMap_) {
526  if (cit.second.getColumnDesc()->isDeletedCol &&
527  insertDataStruct.replicate_count == 0) {
528  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
529  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
530  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
531  insertDataStruct.columnIds.push_back(cit.second.getColumnDesc()->columnId);
532  break;
533  }
534  }
535  // MAT we need to add a removal of the empty column we pushed onto here
536  // for upstream safety. Should not be a problem but need to fix.
537 
538  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
539  for (const auto columnId : insertDataStruct.columnIds) {
540  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
541  CHECK(columnDesc);
542  if (columnMap_.end() == columnMap_.find(columnId)) {
543  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
544  }
545  }
546 
547  // when replicate (add) column(s), this path seems wont work; go separate route...
548  if (insertDataStruct.replicate_count > 0) {
549  replicateData(insertDataStruct);
550  return;
551  }
552 
553  std::unordered_map<int, int> inverseInsertDataColIdMap;
554  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
555  inverseInsertDataColIdMap.insert(
556  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
557  }
558 
559  size_t numRowsLeft = insertDataStruct.numRows;
560  size_t numRowsInserted = 0;
561  vector<DataBlockPtr> dataCopy =
562  insertDataStruct.data; // bc append data will move ptr forward and this violates
563  // constness of InsertData
564  if (numRowsLeft <= 0) {
565  return;
566  }
567 
568  FragmentInfo* currentFragment{nullptr};
569 
570  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
571  currentFragment = createNewFragment(defaultInsertLevel_);
572  } else {
573  currentFragment = fragmentInfoVec_.back().get();
574  }
575  CHECK(currentFragment);
576 
577  size_t startFragment = fragmentInfoVec_.size() - 1;
578 
579  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
580  // loop until done inserting all rows
581  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
582  size_t rowsLeftInCurrentFragment =
583  maxFragmentRows_ - currentFragment->shadowNumTuples;
584  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
585  if (rowsLeftInCurrentFragment != 0) {
586  for (auto& varLenColInfoIt : varLenColInfo_) {
587  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
588  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
589  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
590  if (insertIdIt != inverseInsertDataColIdMap.end()) {
591  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
592  numRowsToInsert = std::min(
593  numRowsToInsert,
594  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
595  numRowsToInsert,
596  numRowsInserted,
597  bytesLeft));
598  }
599  }
600  }
601 
602  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
603  currentFragment = createNewFragment(defaultInsertLevel_);
604  if (numRowsInserted == 0) {
605  startFragment++;
606  }
607  rowsLeftInCurrentFragment = maxFragmentRows_;
608  for (auto& varLenColInfoIt : varLenColInfo_) {
609  varLenColInfoIt.second = 0; // reset byte counter
610  }
611  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
612  for (auto& varLenColInfoIt : varLenColInfo_) {
613  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
614  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
615  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
616  if (insertIdIt != inverseInsertDataColIdMap.end()) {
617  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
618  numRowsToInsert = std::min(
619  numRowsToInsert,
620  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
621  numRowsToInsert,
622  numRowsInserted,
623  bytesLeft));
624  }
625  }
626  }
627 
628  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
629  // never be able to insert anything
630 
631  // for each column, append the data in the appropriate insert buffer
632  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
633  int columnId = insertDataStruct.columnIds[i];
634  auto colMapIt = columnMap_.find(columnId);
635  CHECK(colMapIt != columnMap_.end());
636  currentFragment->shadowChunkMetadataMap[columnId] =
637  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
638  auto varLenColInfoIt = varLenColInfo_.find(columnId);
639  if (varLenColInfoIt != varLenColInfo_.end()) {
640  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
641  }
642  }
643  if (hasMaterializedRowId_) {
644  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
645  currentFragment->shadowNumTuples;
646  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
647  for (size_t i = 0; i < numRowsToInsert; ++i) {
648  row_id_data[i] = i + startId;
649  }
650  DataBlockPtr rowIdBlock;
651  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
652  auto colMapIt = columnMap_.find(rowIdColId_);
653  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
654  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
655  }
656 
657  currentFragment->shadowNumTuples =
658  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
659  numRowsLeft -= numRowsToInsert;
660  numRowsInserted += numRowsToInsert;
661  }
662  {
663  // Only take the fragment info lock when updating fragment info map. Otherwise,
664  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
665  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
666  // COPY_FROM waits for TableWriteLock
667  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
668  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
669  partIt != fragmentInfoVec_.end();
670  ++partIt) {
671  auto fragment_ptr = partIt->get();
672  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
673  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
674  }
675  }
676  numTuples_ += insertDataStruct.numRows;
678 }
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void dropFragmentsToSize(const size_t maxRows) override
Will truncate table to less than maxRows by dropping fragments.
#define CHECK_GT(x, y)
Definition: Logger.h:209
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::unordered_map< int, size_t > varLenColInfo_
void replicateData(const InsertData &insertDataStruct)
std::map< int, Chunk_NS::Chunk > columnMap_
int8_t * numbersPtr
Definition: sqltypes.h:148

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::insertDataNoCheckpoint ( InsertData insertDataStruct)
overridevirtual

Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and checkpoints taken needs to be managed externally.

Implements Fragmenter_Namespace::AbstractFragmenter.

Reimplemented in Fragmenter_Namespace::SortedOrderFragmenter.

Definition at line 390 of file InsertOrderFragmenter.cpp.

Referenced by Fragmenter_Namespace::SortedOrderFragmenter::insertDataNoCheckpoint(), and updateColumns().

390  {
391  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
392  mapd_unique_lock<mapd_shared_mutex> insertLock(
393  insertMutex_); // prevent two threads from trying to insert into the same table
394  // simultaneously
395  insertDataImpl(insertDataStruct);
396 }
void insertDataImpl(InsertData &insertDataStruct)

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::lockInsertCheckpointData ( const InsertData insertDataStruct)
protected
InsertOrderFragmenter& Fragmenter_Namespace::InsertOrderFragmenter::operator= ( const InsertOrderFragmenter )
protected
void Fragmenter_Namespace::InsertOrderFragmenter::replicateData ( const InsertData insertDataStruct)
protected

Definition at line 398 of file InsertOrderFragmenter.cpp.

References Fragmenter_Namespace::InsertData::bypass, catalog_(), CHECK(), Fragmenter_Namespace::InsertData::columnIds, Fragmenter_Namespace::InsertData::data, and Fragmenter_Namespace::InsertData::numRows.

398  {
399  // synchronize concurrent accesses to fragmentInfoVec_
400  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
401  size_t numRowsLeft = insertDataStruct.numRows;
402  for (auto const& fragmentInfo : fragmentInfoVec_) {
403  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
404  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
405  size_t numRowsCanBeInserted;
406  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
407  if (insertDataStruct.bypass[i]) {
408  continue;
409  }
410  auto columnId = insertDataStruct.columnIds[i];
411  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
412  CHECK(colDesc);
413  CHECK(columnMap_.find(columnId) != columnMap_.end());
414 
415  ChunkKey chunkKey = chunkKeyPrefix_;
416  chunkKey.push_back(columnId);
417  chunkKey.push_back(fragmentInfo->fragmentId);
418 
419  auto colMapIt = columnMap_.find(columnId);
420  auto& chunk = colMapIt->second;
421  if (chunk.isChunkOnDevice(
422  dataMgr_,
423  chunkKey,
425  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
426  dataMgr_->deleteChunksWithPrefix(chunkKey);
427  }
428  chunk.createChunkBuffer(
429  dataMgr_,
430  chunkKey,
432  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
433  chunk.initEncoder();
434 
435  try {
436  DataBlockPtr dataCopy = insertDataStruct.data[i];
437  auto size = colDesc->columnType.get_size();
438  if (0 > size) {
439  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
440  varLenColInfo_[columnId] = 0;
441  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
442  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
443  } else {
444  numRowsCanBeInserted = maxChunkSize_ / size;
445  }
446 
447  // FIXME: abort a case in which new column is wider than existing columns
448  if (numRowsCanBeInserted < numRowsToInsert) {
449  throw std::runtime_error("new column '" + colDesc->columnName +
450  "' wider than existing columns is not supported");
451  }
452 
453  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
454  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
455 
456  // update total size of var-len column in (actually the last) fragment
457  if (0 > size) {
458  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
459  varLenColInfo_[columnId] = chunk.getBuffer()->size();
460  }
461  } catch (...) {
462  dataMgr_->deleteChunksWithPrefix(chunkKey);
463  throw;
464  }
465  }
466  numRowsLeft -= numRowsToInsert;
467  }
468  CHECK(0 == numRowsLeft);
469 
470  for (auto const& fragmentInfo : fragmentInfoVec_) {
471  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
472  }
473 }
std::vector< int > ChunkKey
Definition: types.h:35
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
std::shared_ptr< std::mutex > mutex_access_inmem_states
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:428
std::unordered_map< int, size_t > varLenColInfo_
std::map< int, Chunk_NS::Chunk > columnMap_

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::setNumRows ( const size_t  numTuples)
inlineoverridevirtual
void Fragmenter_Namespace::InsertOrderFragmenter::updateChunkStats ( const ColumnDescriptor cd,
std::unordered_map< int, ChunkStats > &  stats_map 
)
overridevirtual

Update chunk stats.

WARNING: This method is entirely unlocked. Higher level locks are expected to prevent any table read or write during a chunk metadata update, since we need to modify various buffers and metadata maps.

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 262 of file InsertOrderFragmenter.cpp.

References catalog_(), CHECK(), ColumnDescriptor::columnId, ColumnDescriptor::columnType, DatumToString(), Data_Namespace::DISK_LEVEL, get_logical_type_info(), Chunk_NS::Chunk::getChunk(), SQLTypeInfo::is_dict_encoded_string(), kBIGINT, LOG, showChunk(), VLOG, and logger::WARNING.

264  {
265  // synchronize concurrent accesses to fragmentInfoVec_
266  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
272  if (shard_ >= 0) {
273  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
274  }
275 
276  CHECK(cd);
277  const auto column_id = cd->columnId;
278  const auto col_itr = columnMap_.find(column_id);
279  CHECK(col_itr != columnMap_.end());
280 
281  for (auto const& fragment : fragmentInfoVec_) {
282  auto stats_itr = stats_map.find(fragment->fragmentId);
283  if (stats_itr != stats_map.end()) {
284  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
285  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
286  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
288  column_id,
289  fragment->fragmentId};
290  auto chunk = Chunk_NS::Chunk::getChunk(cd,
291  &catalog_->getDataMgr(),
292  chunk_key,
294  0,
295  chunk_meta_it->second->numBytes,
296  chunk_meta_it->second->numElements);
297  auto buf = chunk->getBuffer();
298  CHECK(buf);
299  auto encoder = buf->encoder.get();
300  if (!encoder) {
301  throw std::runtime_error("No encoder for chunk " + showChunk(chunk_key));
302  }
303 
304  auto chunk_stats = stats_itr->second;
305 
306  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
307  encoder->getMetadata(old_chunk_metadata);
308  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
309 
310  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
311  // Use the logical type to display data, since the encoding should be ignored
312  const auto logical_ti = cd->columnType.is_dict_encoded_string()
314  : get_logical_type_info(cd->columnType);
315  if (!didResetStats) {
316  VLOG(3) << "Skipping chunk stats reset for " << showChunk(chunk_key);
317  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
318  << DatumToString(chunk_stats.max, logical_ti);
319  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
320  << DatumToString(chunk_stats.min, logical_ti);
321  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
322  continue; // move to next fragment
323  }
324 
325  VLOG(2) << "Resetting chunk stats for " << showChunk(chunk_key);
326  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
327  << DatumToString(chunk_stats.max, logical_ti);
328  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
329  << DatumToString(chunk_stats.min, logical_ti);
330  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
331 
332  // Reset fragment metadata map and set buffer to dirty
333  auto new_metadata = std::make_shared<ChunkMetadata>();
334  // Run through fillChunkStats to ensure any transformations to the raw metadata
335  // values get applied (e.g. for date in days)
336  encoder->getMetadata(new_metadata);
337 
338  fragment->setChunkMetadata(column_id, new_metadata);
339  fragment->shadowChunkMetadataMap =
340  fragment->getChunkMetadataMap(); // TODO(adb): needed?
341  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
342  buf->setDirty();
343  }
344  } else {
345  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
346  << ", table " << physicalTableId_ << ", "
347  << ", column " << column_id;
348  }
349  }
350 }
std::vector< int > ChunkKey
Definition: types.h:35
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:230
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
#define LOG(tag)
Definition: Logger.h:188
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:818
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
bool is_dict_encoded_string() const
Definition: sqltypes.h:442
SQLTypeInfo columnType
std::map< int, Chunk_NS::Chunk > columnMap_
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const std::string &  tab_name,
const std::string &  col_name,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
static

Definition at line 53 of file UpdelStorage.cpp.

References CHECK(), Catalog_Namespace::Catalog::getMetadataForColumn(), and Catalog_Namespace::Catalog::getMetadataForTable().

Referenced by updateColumn().

61  {
62  const auto td = catalog->getMetadataForTable(tab_name);
63  CHECK(td);
64  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
65  CHECK(cd);
66  td->fragmenter->updateColumn(catalog,
67  td,
68  cd,
69  fragment_id,
70  frag_offsets,
71  rhs_values,
72  rhs_type,
73  memory_level,
74  updel_roll);
75 }
CHECK(cgen_state)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const std::vector< ScalarTargetValue > &  rhs_values,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 593 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK(), ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, compactRows(), Data_Namespace::CPU_LEVEL, cpu_threads(), Catalog_Namespace::DBMetadata::dbId, anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_compression(), SQLTypeInfo::get_dimension(), anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), DateConverters::get_epoch_seconds_from_days(), SQLTypeInfo::get_scale(), Chunk_NS::Chunk::getChunk(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), getFragmentInfo(), Catalog_Namespace::Catalog::getLogicalTableId(), Catalog_Namespace::Catalog::getMetadataForColumn(), Catalog_Namespace::Catalog::getMetadataForDict(), getVacuumOffsets(), SQLTypeInfo::is_boolean(), SQLTypeInfo::is_decimal(), SQLTypeInfo::is_fp(), Fragmenter_Namespace::is_integral(), SQLTypeInfo::is_string(), SQLTypeInfo::is_time(), ColumnDescriptor::isDeletedCol, kENCODING_DICT, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, UpdelRoll::mutex, anonymous_namespace{TypedDataAccessors.h}::put_null(), shard_, ColumnDescriptor::tableId, TableDescriptor::tableId, anonymous_namespace{TypedDataAccessors.h}::tabulate_metadata(), temp_mutex_, to_string(), Fragmenter_Namespace::FragmentInfo::unconditionalVacuum_, UNREACHABLE, updateColumnMetadata(), NullAwareValidator< INNER_VALIDATOR >::validate(), and Fragmenter_Namespace::wait_cleanup_threads().

601  {
602  updel_roll.catalog = catalog;
603  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
604  updel_roll.memoryLevel = memory_level;
605 
606  const size_t ncore = cpu_threads();
607  const auto nrow = frag_offsets.size();
608  const auto n_rhs_values = rhs_values.size();
609  if (0 == nrow) {
610  return;
611  }
612  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
613 
614  auto fragment_ptr = getFragmentInfo(fragment_id);
615  auto& fragment = *fragment_ptr;
616  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
617  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
618  ChunkKey chunk_key{
619  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
620  auto chunk = Chunk_NS::Chunk::getChunk(cd,
621  &catalog->getDataMgr(),
622  chunk_key,
624  0,
625  chunk_meta_it->second->numBytes,
626  chunk_meta_it->second->numElements);
627 
628  std::vector<int8_t> has_null_per_thread(ncore, 0);
629  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
630  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
631  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
632  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
633 
634  // parallel update elements
635  std::vector<std::future<void>> threads;
636 
637  const auto segsz = (nrow + ncore - 1) / ncore;
638  auto dbuf = chunk->getBuffer();
639  auto dbuf_addr = dbuf->getMemoryPtr();
640  dbuf->setUpdated();
641  {
642  std::lock_guard<std::mutex> lck(updel_roll.mutex);
643  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
644  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
645  }
646 
647  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
648  cd->tableId,
649  cd->columnId,
650  fragment.fragmentId};
651  updel_roll.dirtyChunkeys.insert(chunkey);
652  }
653  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
654  threads.emplace_back(std::async(
655  std::launch::async,
656  [=,
657  &has_null_per_thread,
658  &min_int64t_per_thread,
659  &max_int64t_per_thread,
660  &min_double_per_thread,
661  &max_double_per_thread,
662  &frag_offsets,
663  &rhs_values] {
664  SQLTypeInfo lhs_type = cd->columnType;
665 
666  // !! not sure if this is a undocumented convention or a bug, but for a sharded
667  // table the dictionary id of a encoded string column is not specified by
668  // comp_param in physical table but somehow in logical table :) comp_param in
669  // physical table is always 0, so need to adapt accordingly...
670  auto cdl = (shard_ < 0)
671  ? cd
672  : catalog->getMetadataForColumn(
673  catalog->getLogicalTableId(td->tableId), cd->columnId);
674  CHECK(cdl);
675  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
676  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
677  lhs_type, &decimalOverflowValidator);
678  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
679  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
680  lhs_type, &dateDaysOverflowValidator);
681 
682  StringDictionary* stringDict{nullptr};
683  if (lhs_type.is_string()) {
684  CHECK(kENCODING_DICT == lhs_type.get_compression());
685  auto dictDesc = const_cast<DictDescriptor*>(
686  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
687  CHECK(dictDesc);
688  stringDict = dictDesc->stringDict.get();
689  CHECK(stringDict);
690  }
691 
692  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
693  const auto roffs = frag_offsets[r];
694  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
695  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
696  ScalarTargetValue sv2;
697 
698  // Subtle here is on the two cases of string-to-string assignments, when
699  // upstream passes RHS string as a string index instead of a preferred "real
700  // string".
701  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
702  // index
703  // in this layer, so if upstream passes a str idx here, an
704  // exception is thrown.
705  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
706  // str idx.
707  if (rhs_type.is_string()) {
708  if (const auto vp = boost::get<int64_t>(sv)) {
709  auto dictDesc = const_cast<DictDescriptor*>(
710  catalog->getMetadataForDict(rhs_type.get_comp_param()));
711  if (nullptr == dictDesc) {
712  throw std::runtime_error(
713  "UPDATE does not support cast from string literal to string "
714  "column.");
715  }
716  auto stringDict = dictDesc->stringDict.get();
717  CHECK(stringDict);
718  sv2 = NullableString(stringDict->getString(*vp));
719  sv = &sv2;
720  }
721  }
722 
723  if (const auto vp = boost::get<int64_t>(sv)) {
724  auto v = *vp;
725  if (lhs_type.is_string()) {
726  throw std::runtime_error("UPDATE does not support cast to string.");
727  }
728  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
729  if (lhs_type.is_decimal()) {
730  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
731  int64_t decimal_val;
732  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
733  tabulate_metadata(lhs_type,
734  min_int64t_per_thread[c],
735  max_int64t_per_thread[c],
736  has_null_per_thread[c],
737  (v == inline_int_null_value<int64_t>() &&
738  lhs_type.get_notnull() == false)
739  ? v
740  : decimal_val);
741  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
742  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
743  if (positive_v_and_negative_d || negative_v_and_positive_d) {
744  throw std::runtime_error(
745  "Data conversion overflow on " + std::to_string(v) +
746  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
747  std::to_string(rhs_type.get_scale()) + ") to (" +
748  std::to_string(lhs_type.get_dimension()) + ", " +
749  std::to_string(lhs_type.get_scale()) + ")");
750  }
751  } else if (is_integral(lhs_type)) {
752  if (lhs_type.is_date_in_days()) {
753  // Store meta values in seconds
754  if (lhs_type.get_size() == 2) {
755  nullAwareDateOverflowValidator.validate<int16_t>(v);
756  } else {
757  nullAwareDateOverflowValidator.validate<int32_t>(v);
758  }
759  int64_t days;
760  get_scalar<int64_t>(data_ptr, lhs_type, days);
761  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
762  tabulate_metadata(lhs_type,
763  min_int64t_per_thread[c],
764  max_int64t_per_thread[c],
765  has_null_per_thread[c],
766  (v == inline_int_null_value<int64_t>() &&
767  lhs_type.get_notnull() == false)
768  ? NullSentinelSupplier()(lhs_type, v)
769  : seconds);
770  } else {
771  int64_t target_value;
772  if (rhs_type.is_decimal()) {
773  target_value = round(decimal_to_double(rhs_type, v));
774  } else {
775  target_value = v;
776  }
777  tabulate_metadata(lhs_type,
778  min_int64t_per_thread[c],
779  max_int64t_per_thread[c],
780  has_null_per_thread[c],
781  target_value);
782  }
783  } else {
785  lhs_type,
786  min_double_per_thread[c],
787  max_double_per_thread[c],
788  has_null_per_thread[c],
789  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
790  }
791  } else if (const auto vp = boost::get<double>(sv)) {
792  auto v = *vp;
793  if (lhs_type.is_string()) {
794  throw std::runtime_error("UPDATE does not support cast to string.");
795  }
796  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
797  if (lhs_type.is_integer()) {
798  tabulate_metadata(lhs_type,
799  min_int64t_per_thread[c],
800  max_int64t_per_thread[c],
801  has_null_per_thread[c],
802  int64_t(v));
803  } else if (lhs_type.is_fp()) {
804  tabulate_metadata(lhs_type,
805  min_double_per_thread[c],
806  max_double_per_thread[c],
807  has_null_per_thread[c],
808  double(v));
809  } else {
810  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
811  "LHS with a floating RHS.";
812  }
813  } else if (const auto vp = boost::get<float>(sv)) {
814  auto v = *vp;
815  if (lhs_type.is_string()) {
816  throw std::runtime_error("UPDATE does not support cast to string.");
817  }
818  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
819  if (lhs_type.is_integer()) {
820  tabulate_metadata(lhs_type,
821  min_int64t_per_thread[c],
822  max_int64t_per_thread[c],
823  has_null_per_thread[c],
824  int64_t(v));
825  } else {
826  tabulate_metadata(lhs_type,
827  min_double_per_thread[c],
828  max_double_per_thread[c],
829  has_null_per_thread[c],
830  double(v));
831  }
832  } else if (const auto vp = boost::get<NullableString>(sv)) {
833  const auto s = boost::get<std::string>(vp);
834  const auto sval = s ? *s : std::string("");
835  if (lhs_type.is_string()) {
836  decltype(stringDict->getOrAdd(sval)) sidx;
837  {
838  std::unique_lock<std::mutex> lock(temp_mutex_);
839  sidx = stringDict->getOrAdd(sval);
840  }
841  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
842  tabulate_metadata(lhs_type,
843  min_int64t_per_thread[c],
844  max_int64t_per_thread[c],
845  has_null_per_thread[c],
846  int64_t(sidx));
847  } else if (sval.size() > 0) {
848  auto dval = std::atof(sval.data());
849  if (lhs_type.is_boolean()) {
850  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
851  } else if (lhs_type.is_time()) {
852  throw std::runtime_error(
853  "Date/Time/Timestamp update not supported through translated "
854  "string path.");
855  }
856  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
857  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
858  tabulate_metadata(lhs_type,
859  min_double_per_thread[c],
860  max_double_per_thread[c],
861  has_null_per_thread[c],
862  double(dval));
863  } else {
864  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
865  tabulate_metadata(lhs_type,
866  min_int64t_per_thread[c],
867  max_int64t_per_thread[c],
868  has_null_per_thread[c],
869  int64_t(dval));
870  }
871  } else {
872  put_null(data_ptr, lhs_type, cd->columnName);
873  has_null_per_thread[c] = true;
874  }
875  } else {
876  CHECK(false);
877  }
878  }
879  }));
880  if (threads.size() >= (size_t)cpu_threads()) {
881  wait_cleanup_threads(threads);
882  }
883  }
884  wait_cleanup_threads(threads);
885 
886  // for unit test
888  if (cd->isDeletedCol) {
889  const auto deleted_offsets = getVacuumOffsets(chunk);
890  if (deleted_offsets.size() > 0) {
891  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
892  return;
893  }
894  }
895  }
896  bool has_null_per_chunk{false};
897  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
898  double min_double_per_chunk{std::numeric_limits<double>::max()};
899  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
900  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
901  for (size_t c = 0; c < ncore; ++c) {
902  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
903  max_double_per_chunk =
904  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
905  min_double_per_chunk =
906  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
907  max_int64t_per_chunk =
908  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
909  min_int64t_per_chunk =
910  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
911  }
913  fragment,
914  chunk,
915  has_null_per_chunk,
916  max_double_per_chunk,
917  min_double_per_chunk,
918  max_int64t_per_chunk,
919  min_int64t_per_chunk,
920  cd->columnType,
921  updel_roll);
922 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
std::vector< int > ChunkKey
Definition: types.h:35
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
HOST DEVICE int get_scale() const
Definition: sqltypes.h:263
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:241
std::string to_string(char const *&&v)
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
int64_t get_epoch_seconds_from_days(const int64_t days)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3584
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1449
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:266
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:260
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:267
Descriptor for a dictionary for a string columne.
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:415
int cpu_threads()
Definition: thread_count.h:25
bool is_decimal() const
Definition: sqltypes.h:418
std::string columnName
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumn ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const ColumnDescriptor cd,
const int  fragment_id,
const std::vector< uint64_t > &  frag_offsets,
const ScalarTargetValue rhs_value,
const SQLTypeInfo rhs_type,
const Data_Namespace::MemoryLevel  memory_level,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 77 of file UpdelStorage.cpp.

References updateColumn().

85  {
86  updateColumn(catalog,
87  td,
88  cd,
89  fragment_id,
90  frag_offsets,
91  std::vector<ScalarTargetValue>(1, rhs_value),
92  rhs_type,
93  memory_level,
94  updel_roll);
95 }
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumnMetadata ( const ColumnDescriptor cd,
FragmentInfo fragment,
std::shared_ptr< Chunk_NS::Chunk chunk,
const bool  null,
const double  dmax,
const double  dmin,
const int64_t  lmax,
const int64_t  lmin,
const SQLTypeInfo rhs_type,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 924 of file UpdelStorage.cpp.

References UpdelRoll::catalog, UpdelRoll::chunkMetadata, ColumnDescriptor::columnId, ColumnDescriptor::columnType, Fragmenter_Namespace::FragmentInfo::getChunkMetadataMapPhysical(), Catalog_Namespace::Catalog::getMetadataForTable(), SQLTypeInfo::is_decimal(), Fragmenter_Namespace::is_integral(), kENCODING_DICT, UpdelRoll::mutex, UpdelRoll::numTuples, Fragmenter_Namespace::FragmentInfo::shadowNumTuples, ColumnDescriptor::tableId, and foreign_storage::update_stats().

Referenced by compactRows(), and updateColumn().

933  {
934  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
935  auto key = std::make_pair(td, &fragment);
936  std::lock_guard<std::mutex> lck(updel_roll.mutex);
937  if (0 == updel_roll.chunkMetadata.count(key)) {
938  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
939  }
940  if (0 == updel_roll.numTuples.count(key)) {
941  updel_roll.numTuples[key] = fragment.shadowNumTuples;
942  }
943  auto& chunkMetadata = updel_roll.chunkMetadata[key];
944 
945  auto buffer = chunk->getBuffer();
946  const auto& lhs_type = cd->columnType;
947 
948  auto encoder = buffer->encoder.get();
949  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
950  static_assert(std::is_same<decltype(min), decltype(max)>::value,
951  "Type mismatch on min/max");
952  if (has_null) {
953  encoder->updateStats(decltype(min)(), true);
954  }
955  if (max < min) {
956  return;
957  }
958  encoder->updateStats(min, false);
959  encoder->updateStats(max, false);
960  };
961 
962  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
963  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
964  } else if (lhs_type.is_fp()) {
965  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
966  } else if (lhs_type.is_decimal()) {
967  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
968  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
969  has_null_per_chunk);
970  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
971  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
972  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
973  }
974  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
975 
976  // removed as @alex suggests. keep it commented in case of any chance to revisit
977  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
978 }
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_decimal() const
Definition: sqltypes.h:418
std::mutex mutex
Definition: UpdelRoll.h:49
bool is_integral(const SQLTypeInfo &t)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateColumns ( const Catalog_Namespace::Catalog catalog,
const TableDescriptor td,
const int  fragmentId,
const std::vector< TargetMetaInfo sourceMetaInfo,
const std::vector< const ColumnDescriptor * >  columnDescriptors,
const RowDataProvider sourceDataProvider,
const size_t  indexOffFragmentOffsetColumn,
const Data_Namespace::MemoryLevel  memoryLevel,
UpdelRoll updelRoll,
Executor executor 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 290 of file UpdelStorage.cpp.

References UpdelRoll::catalog, CHECK(), checked_get(), cpu_threads(), TargetValueConverterFactory::create(), Fragmenter_Namespace::InsertData::databaseId, Catalog_Namespace::DBMetadata::dbId, UpdelRoll::dirtyChunkeys, UpdelRoll::dirtyChunks, g_enable_experimental_string_functions, Fragmenter_Namespace::get_chunks(), get_logical_type_info(), SQLTypeInfo::get_size(), Catalog_Namespace::Catalog::getCurrentDB(), Fragmenter_Namespace::RowDataProvider::getEntryAt(), Fragmenter_Namespace::RowDataProvider::getEntryCount(), getFragmentInfo(), Fragmenter_Namespace::RowDataProvider::getLiteralDictionary(), Catalog_Namespace::Catalog::getLogicalTableId(), Fragmenter_Namespace::RowDataProvider::getRowCount(), insertDataNoCheckpoint(), SQLTypeInfo::is_string(), UpdelRoll::is_varlen_update, kLINESTRING, kMULTIPOLYGON, kPOINT, kPOLYGON, UpdelRoll::logicalTableId, UpdelRoll::memoryLevel, num_rows, Fragmenter_Namespace::InsertData::numRows, TableDescriptor::tableId, and Fragmenter_Namespace::InsertData::tableId.

300  {
301  updelRoll.is_varlen_update = true;
302  updelRoll.catalog = catalog;
303  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
304  updelRoll.memoryLevel = memoryLevel;
305 
306  size_t num_entries = sourceDataProvider.getEntryCount();
307  size_t num_rows = sourceDataProvider.getRowCount();
308 
309  if (0 == num_rows) {
310  // bail out early
311  return;
312  }
313 
315 
316  auto fragment_ptr = getFragmentInfo(fragmentId);
317  auto& fragment = *fragment_ptr;
318  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
319  get_chunks(catalog, td, fragment, memoryLevel, chunks);
320  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
321  columnDescriptors.size());
322  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
323 
324  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
325  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
326  auto chunk = chunks[indexOfChunk];
327  const auto chunk_cd = chunk->getColumnDesc();
328 
329  if (chunk_cd->isDeletedCol) {
330  deletedChunk = chunk;
331  continue;
332  }
333 
334  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
335  columnDescriptors.end(),
336  [=](const ColumnDescriptor* cd) -> bool {
337  return cd->columnId == chunk_cd->columnId;
338  });
339 
340  if (targetColumnIt != columnDescriptors.end()) {
341  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
342 
343  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
344  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
345 
347  num_rows,
348  *catalog,
349  sourceDataMetaInfo,
350  targetDescriptor,
351  targetDescriptor->columnType,
352  !targetDescriptor->columnType.get_notnull(),
353  sourceDataProvider.getLiteralDictionary(),
355  ? executor->getStringDictionaryProxy(
356  sourceDataMetaInfo.get_type_info().get_comp_param(),
357  executor->getRowSetMemoryOwner(),
358  true)
359  : nullptr};
360  auto converter = factory.create(param);
361  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
362 
363  if (targetDescriptor->columnType.is_geometry()) {
364  // geometry columns are composites
365  // need to skip chunks, depending on geo type
366  switch (targetDescriptor->columnType.get_type()) {
367  case kMULTIPOLYGON:
368  indexOfChunk += 5;
369  break;
370  case kPOLYGON:
371  indexOfChunk += 4;
372  break;
373  case kLINESTRING:
374  indexOfChunk += 2;
375  break;
376  case kPOINT:
377  indexOfChunk += 1;
378  break;
379  default:
380  CHECK(false); // not supported
381  }
382  }
383  } else {
384  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
385  std::unique_ptr<ChunkToInsertDataConverter> converter;
386 
387  if (chunk_cd->columnType.is_fixlen_array()) {
388  converter =
389  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
390  } else if (chunk_cd->columnType.is_string()) {
391  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
392  } else if (chunk_cd->columnType.is_geometry()) {
393  // the logical geo column is a string column
394  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
395  } else {
396  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
397  }
398 
399  chunkConverters.push_back(std::move(converter));
400 
401  } else if (chunk_cd->columnType.is_date_in_days()) {
402  /* Q: Why do we need this?
403  A: In variable length updates path we move the chunk content of column
404  without decoding. Since it again passes through DateDaysEncoder
405  the expected value should be in seconds, but here it will be in days.
406  Therefore, using DateChunkConverter chunk values are being scaled to
407  seconds which then ultimately encoded in days in DateDaysEncoder.
408  */
409  std::unique_ptr<ChunkToInsertDataConverter> converter;
410  const size_t physical_size = chunk_cd->columnType.get_size();
411  if (physical_size == 2) {
412  converter =
413  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
414  } else if (physical_size == 4) {
415  converter =
416  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
417  } else {
418  CHECK(false);
419  }
420  chunkConverters.push_back(std::move(converter));
421  } else {
422  std::unique_ptr<ChunkToInsertDataConverter> converter;
423  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
424  int logical_size = logical_type.get_size();
425  int physical_size = chunk_cd->columnType.get_size();
426 
427  if (logical_type.is_string()) {
428  // for dicts -> logical = physical
429  logical_size = physical_size;
430  }
431 
432  if (8 == physical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
434  num_rows, chunk.get());
435  } else if (4 == physical_size) {
436  if (8 == logical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
438  num_rows, chunk.get());
439  } else {
440  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
441  num_rows, chunk.get());
442  }
443  } else if (2 == chunk_cd->columnType.get_size()) {
444  if (8 == logical_size) {
445  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
446  num_rows, chunk.get());
447  } else if (4 == logical_size) {
448  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
449  num_rows, chunk.get());
450  } else {
451  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
452  num_rows, chunk.get());
453  }
454  } else if (1 == chunk_cd->columnType.get_size()) {
455  if (8 == logical_size) {
456  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
457  num_rows, chunk.get());
458  } else if (4 == logical_size) {
459  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
460  num_rows, chunk.get());
461  } else if (2 == logical_size) {
462  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
463  num_rows, chunk.get());
464  } else {
465  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
466  num_rows, chunk.get());
467  }
468  } else {
469  CHECK(false); // unknown
470  }
471 
472  chunkConverters.push_back(std::move(converter));
473  }
474  }
475  }
476 
477  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
478  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
479 
480  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
481  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
482  deletedChunk->getColumnDesc()->tableId,
483  deletedChunk->getColumnDesc()->columnId,
484  fragment.fragmentId};
485  updelRoll.dirtyChunkeys.insert(chunkey);
486  bool* deletedChunkBuffer =
487  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
488 
489  std::atomic<size_t> row_idx{0};
490 
491  auto row_converter = [&sourceDataProvider,
492  &sourceDataConverters,
493  &indexOffFragmentOffsetColumn,
494  &chunkConverters,
495  &deletedChunkBuffer,
496  &row_idx](size_t indexOfEntry) -> void {
497  // convert the source data
498  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
499  if (row.empty()) {
500  return;
501  }
502 
503  size_t indexOfRow = row_idx.fetch_add(1);
504 
505  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
506  if (sourceDataConverters[col]) {
507  const auto& mapd_variant = row[col];
508  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
509  }
510  }
511 
512  auto scalar = checked_get(
513  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
514  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
515 
516  // convert the remaining chunks
517  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
518  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
519  }
520 
521  // now mark the row as deleted
522  deletedChunkBuffer[indexInChunkBuffer] = true;
523  };
524 
525  bool can_go_parallel = num_rows > 20000;
526 
527  if (can_go_parallel) {
528  const size_t num_worker_threads = cpu_threads();
529  std::vector<std::future<void>> worker_threads;
530  for (size_t i = 0,
531  start_entry = 0,
532  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
533  i < num_worker_threads && start_entry < num_entries;
534  ++i, start_entry += stride) {
535  const auto end_entry = std::min(start_entry + stride, num_rows);
536  worker_threads.push_back(std::async(
537  std::launch::async,
538  [&row_converter](const size_t start, const size_t end) {
539  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
540  row_converter(indexOfRow);
541  }
542  },
543  start_entry,
544  end_entry));
545  }
546 
547  for (auto& child : worker_threads) {
548  child.wait();
549  }
550 
551  } else {
552  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
553  row_converter(entryIdx);
554  }
555  }
556 
558  insert_data.databaseId = catalog->getCurrentDB().dbId;
559  insert_data.tableId = td->tableId;
560 
561  for (size_t i = 0; i < chunkConverters.size(); i++) {
562  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
563  continue;
564  }
565 
566  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
567  if (sourceDataConverters[i]) {
568  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
569  }
570  continue;
571  }
572 
573  insert_data.numRows = num_rows;
574  insertDataNoCheckpoint(insert_data);
575 
576  // update metdata
577  if (!deletedChunk->getBuffer()->has_encoder) {
578  deletedChunk->initEncoder();
579  }
580  deletedChunk->getBuffer()->encoder->updateStats(static_cast<int64_t>(true), false);
581 
582  auto& shadowDeletedChunkMeta =
583  fragment.shadowChunkMetadataMap[deletedChunk->getColumnDesc()->columnId];
584  if (shadowDeletedChunkMeta->numElements >
585  deletedChunk->getBuffer()->encoder->getNumElems()) {
586  // the append will have populated shadow meta data, otherwise use existing num
587  // elements
588  deletedChunk->getBuffer()->encoder->setNumElems(shadowDeletedChunkMeta->numElements);
589  }
590  deletedChunk->getBuffer()->setUpdated();
591 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
bool is_varlen_update
Definition: UpdelRoll.h:67
std::vector< int > ChunkKey
Definition: types.h:35
HOST DEVICE int get_size() const
Definition: sqltypes.h:268
const int8_t const int64_t * num_rows
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:818
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
virtual size_t const getEntryCount() const =0
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3584
specifies the content in-memory of a row in the column metadata table
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
bool g_enable_experimental_string_functions
virtual StringDictionaryProxy * getLiteralDictionary() const =0
int logicalTableId
Definition: UpdelRoll.h:64
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool is_string() const
Definition: sqltypes.h:415
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
int cpu_threads()
Definition: thread_count.h:25
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0

+ Here is the call graph for this function:

void Fragmenter_Namespace::InsertOrderFragmenter::updateMetadata ( const Catalog_Namespace::Catalog catalog,
const MetaDataKey key,
UpdelRoll updel_roll 
)
overridevirtual

Implements Fragmenter_Namespace::AbstractFragmenter.

Definition at line 980 of file UpdelStorage.cpp.

References UpdelRoll::chunkMetadata, fragmentInfoMutex_, and UpdelRoll::numTuples.

982  {
983  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
984  if (updel_roll.chunkMetadata.count(key)) {
985  auto& fragmentInfo = *key.second;
986  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
987  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
988  fragmentInfo.setChunkMetadataMap(chunkMetadata);
989  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
990  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
991  }
992 }
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_fixlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1092 of file UpdelStorage.cpp.

References anonymous_namespace{ResultSetReductionInterpreter.cpp}::get_element_size(), and Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1095  {
1096  const auto cd = chunk->getColumnDesc();
1097  const auto& col_type = cd->columnType;
1098  auto data_buffer = chunk->getBuffer();
1099  auto data_addr = data_buffer->getMemoryPtr();
1100  auto element_size =
1101  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1102  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1103  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1104  size_t nbytes_fix_data_to_keep = 0;
1105  auto nrows_to_vacuum = frag_offsets.size();
1106  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1107  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1108  auto is_last_one = irow == nrows_to_vacuum;
1109  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1110  auto maddr_to_vacuum = data_addr;
1111  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1112  if (nrows_to_keep > 0) {
1113  auto nbytes_to_keep = nrows_to_keep * element_size;
1114  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1115  // move curr fixlen row block toward front
1116  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1117  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1118  nbytes_to_keep);
1119  }
1120  irow_of_blk_to_fill += nrows_to_keep;
1121  nbytes_fix_data_to_keep += nbytes_to_keep;
1122  }
1123  irow_of_blk_to_keep = irow_to_vacuum + 1;
1124  }
1125  return nbytes_fix_data_to_keep;
1126 }

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

auto Fragmenter_Namespace::InsertOrderFragmenter::vacuum_varlen_rows ( const FragmentInfo fragment,
const std::shared_ptr< Chunk_NS::Chunk > &  chunk,
const std::vector< uint64_t > &  frag_offsets 
)
protected

Definition at line 1128 of file UpdelStorage.cpp.

References Fragmenter_Namespace::FragmentInfo::getPhysicalNumTuples().

Referenced by compactRows().

1131  {
1132  auto data_buffer = chunk->getBuffer();
1133  auto index_buffer = chunk->getIndexBuf();
1134  auto data_addr = data_buffer->getMemoryPtr();
1135  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1136  auto index_array = (StringOffsetT*)indices_addr;
1137  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1138  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1139  size_t nbytes_fix_data_to_keep = 0;
1140  size_t nbytes_var_data_to_keep = 0;
1141  auto nrows_to_vacuum = frag_offsets.size();
1142  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1143  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1144  auto is_last_one = irow == nrows_to_vacuum;
1145  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1146  auto maddr_to_vacuum = data_addr;
1147  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1148  if (nrows_to_keep > 0) {
1149  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1150  auto nbytes_to_keep =
1151  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1152  index_array[irow_of_blk_to_keep];
1153  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1154  // move curr varlen row block toward front
1155  memmove(data_addr + ibyte_var_data_to_keep,
1156  data_addr + index_array[irow_of_blk_to_keep],
1157  nbytes_to_keep);
1158 
1159  const auto index_base = index_array[irow_of_blk_to_keep];
1160  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1161  auto& index = index_array[irow_of_blk_to_keep + i];
1162  index = ibyte_var_data_to_keep + (index - index_base);
1163  }
1164  }
1165  nbytes_var_data_to_keep += nbytes_to_keep;
1166  maddr_to_vacuum = indices_addr;
1167 
1168  constexpr static auto index_element_size = sizeof(StringOffsetT);
1169  nbytes_to_keep = nrows_to_keep * index_element_size;
1170  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1171  // move curr fixlen row block toward front
1172  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1173  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1174  nbytes_to_keep);
1175  }
1176  irow_of_blk_to_fill += nrows_to_keep;
1177  nbytes_fix_data_to_keep += nbytes_to_keep;
1178  }
1179  irow_of_blk_to_keep = irow_to_vacuum + 1;
1180  }
1181  return nbytes_var_data_to_keep;
1182 }
int32_t StringOffsetT
Definition: sqltypes.h:866

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

Catalog_Namespace::Catalog* Fragmenter_Namespace::InsertOrderFragmenter::catalog_
protected
std::vector<int> Fragmenter_Namespace::InsertOrderFragmenter::chunkKeyPrefix_
protected

Definition at line 186 of file InsertOrderFragmenter.h.

Referenced by getChunkKeyPrefix(), and getFragmenterId().

std::map<int, Chunk_NS::Chunk> Fragmenter_Namespace::InsertOrderFragmenter::columnMap_
protected

stores a map of column id to metadata about that column

Definition at line 188 of file InsertOrderFragmenter.h.

Data_Namespace::DataMgr* Fragmenter_Namespace::InsertOrderFragmenter::dataMgr_
protected

Definition at line 192 of file InsertOrderFragmenter.h.

Data_Namespace::MemoryLevel Fragmenter_Namespace::InsertOrderFragmenter::defaultInsertLevel_
protected

Definition at line 209 of file InsertOrderFragmenter.h.

std::string Fragmenter_Namespace::InsertOrderFragmenter::fragmenterType_
protected

Definition at line 203 of file InsertOrderFragmenter.h.

Referenced by getFragmenterType().

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoMutex_
protected

Definition at line 205 of file InsertOrderFragmenter.h.

Referenced by updateMetadata().

std::deque<std::unique_ptr<FragmentInfo> > Fragmenter_Namespace::InsertOrderFragmenter::fragmentInfoVec_
protected

data about each fragment stored - id and number of rows

Definition at line 190 of file InsertOrderFragmenter.h.

bool Fragmenter_Namespace::InsertOrderFragmenter::hasMaterializedRowId_
protected

Definition at line 211 of file InsertOrderFragmenter.h.

mapd_shared_mutex Fragmenter_Namespace::InsertOrderFragmenter::insertMutex_
protected

Definition at line 207 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxChunkSize_
protected

Definition at line 201 of file InsertOrderFragmenter.h.

int Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentId_
protected

Definition at line 200 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxFragmentRows_
protected

Definition at line 196 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::maxRows_
protected

Definition at line 202 of file InsertOrderFragmenter.h.

std::shared_ptr<std::mutex> Fragmenter_Namespace::InsertOrderFragmenter::mutex_access_inmem_states
protected

Definition at line 214 of file InsertOrderFragmenter.h.

size_t Fragmenter_Namespace::InsertOrderFragmenter::numTuples_
protected

Definition at line 199 of file InsertOrderFragmenter.h.

Referenced by getNumRows(), and setNumRows().

size_t Fragmenter_Namespace::InsertOrderFragmenter::pageSize_
protected

Definition at line 197 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::physicalTableId_
protected
int Fragmenter_Namespace::InsertOrderFragmenter::rowIdColId_
protected

Definition at line 212 of file InsertOrderFragmenter.h.

const int Fragmenter_Namespace::InsertOrderFragmenter::shard_
protected

Definition at line 195 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

std::mutex Fragmenter_Namespace::InsertOrderFragmenter::temp_mutex_
mutableprotected

Definition at line 237 of file InsertOrderFragmenter.h.

Referenced by updateColumn().

const bool Fragmenter_Namespace::InsertOrderFragmenter::uses_foreign_storage_
protected

Definition at line 210 of file InsertOrderFragmenter.h.

std::unordered_map<int, size_t> Fragmenter_Namespace::InsertOrderFragmenter::varLenColInfo_
protected

Definition at line 213 of file InsertOrderFragmenter.h.


The documentation for this class was generated from the following files: