OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
InsertOrderFragmenter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/lexical_cast.hpp>
21 #include <cassert>
22 #include <cmath>
23 #include <iostream>
24 #include <limits>
25 #include <memory>
26 #include <thread>
27 #include <type_traits>
28 
29 #include "DataMgr/AbstractBuffer.h"
30 #include "DataMgr/DataMgr.h"
32 #include "LockMgr/LockMgr.h"
33 #include "Logger/Logger.h"
34 #include "Shared/checked_alloc.h"
35 #include "Shared/thread_count.h"
36 
37 #define DROP_FRAGMENT_FACTOR \
38  0.97 // drop to 97% of max so we don't keep adding and dropping fragments
39 
40 using Chunk_NS::Chunk;
43 
45 
46 using namespace std;
47 
48 namespace Fragmenter_Namespace {
49 
50 InsertOrderFragmenter::InsertOrderFragmenter(
51  const vector<int> chunkKeyPrefix,
52  vector<Chunk>& chunkVec,
53  Data_Namespace::DataMgr* dataMgr,
55  const int physicalTableId,
56  const int shard,
57  const size_t maxFragmentRows,
58  const size_t maxChunkSize,
59  const size_t pageSize,
60  const size_t maxRows,
61  const Data_Namespace::MemoryLevel defaultInsertLevel,
62  const bool uses_foreign_storage)
63  : chunkKeyPrefix_(chunkKeyPrefix)
64  , dataMgr_(dataMgr)
65  , catalog_(catalog)
66  , physicalTableId_(physicalTableId)
67  , shard_(shard)
68  , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
69  , pageSize_(pageSize)
70  , numTuples_(0)
71  , maxFragmentId_(-1)
72  , maxChunkSize_(maxChunkSize)
73  , maxRows_(maxRows)
74  , fragmenterType_("insert_order")
75  , defaultInsertLevel_(defaultInsertLevel)
76  , uses_foreign_storage_(uses_foreign_storage)
77  , hasMaterializedRowId_(false)
78  , mutex_access_inmem_states(new std::mutex) {
79  // Note that Fragmenter is not passed virtual columns and so should only
80  // find row id column if it is non virtual
81 
82  for (auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
83  int columnId = colIt->getColumnDesc()->columnId;
84  columnMap_[columnId] = *colIt;
85  if (colIt->getColumnDesc()->columnName == "rowid") {
86  hasMaterializedRowId_ = true;
87  rowIdColId_ = columnId;
88  }
89  }
90  conditionallyInstantiateFileMgrWithParams();
91  getChunkMetadata();
92 }
93 
94 InsertOrderFragmenter::~InsertOrderFragmenter() {}
95 
96 namespace {
97 
102 int compute_device_for_fragment(const int table_id,
103  const int fragment_id,
104  const int num_devices) {
106  return (table_id + fragment_id) % num_devices;
107  } else {
108  return fragment_id % num_devices;
109  }
110 }
111 
112 } // namespace
113 
114 void InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams() {
115  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
116  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
117  // storage per table
118  if (defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
119  const TableDescriptor* td =
120  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
122  File_Namespace::FileMgrParams fileMgrParams;
123  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
124  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
125  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
126  }
127  }
128 }
129 
130 void InsertOrderFragmenter::getChunkMetadata() {
131  if (uses_foreign_storage_ ||
132  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
133  // memory-resident tables won't have anything on disk
134  ChunkMetadataVector chunk_metadata;
135  dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
136 
137  // data comes like this - database_id, table_id, column_id, fragment_id
138  // but lets sort by database_id, table_id, fragment_id, column_id
139 
140  int fragment_subkey_index = 3;
141  std::sort(chunk_metadata.begin(),
142  chunk_metadata.end(),
143  [&](const auto& pair1, const auto& pair2) {
144  return pair1.first[3] < pair2.first[3];
145  });
146 
147  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
148  ++chunk_itr) {
149  int cur_column_id = chunk_itr->first[2];
150  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
151 
152  if (fragmentInfoVec_.empty() ||
153  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
154  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
155  CHECK(new_fragment_info);
156  maxFragmentId_ = cur_fragment_id;
157  new_fragment_info->fragmentId = cur_fragment_id;
158  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
159  numTuples_ += new_fragment_info->getPhysicalNumTuples();
160  for (const auto level_size : dataMgr_->levelSizes_) {
161  new_fragment_info->deviceIds.push_back(
162  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
163  }
164  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
165  new_fragment_info->physicalTableId = physicalTableId_;
166  new_fragment_info->shard = shard_;
167  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
168  } else {
169  if (chunk_itr->second->numElements !=
170  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
171  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
172  std::to_string(physicalTableId_) + ", Column " +
173  std::to_string(cur_column_id) + ". Fragment Tuples: " +
175  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
176  ", Chunk Tuples: " +
177  std::to_string(chunk_itr->second->numElements);
178  }
179  }
180  CHECK(fragmentInfoVec_.back().get());
181  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
182  }
183  }
184 
185  size_t maxFixedColSize = 0;
186 
187  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
188  auto size = colIt->second.getColumnDesc()->columnType.get_size();
189  if (size == -1) { // variable length
190  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
191  size = 8; // b/c we use this for string and array indices - gross to have magic
192  // number here
193  }
194  CHECK_GE(size, 0);
195  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
196  }
197 
198  // this is maximum number of rows assuming everything is fixed length
199  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
200 
201  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
202  // Now need to get the insert buffers for each column - should be last
203  // fragment
204  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
205  // TODO: add accessor here for safe indexing
206  int deviceId =
207  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
208  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
209  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
210  insertKey.push_back(colIt->first); // column id
211  insertKey.push_back(lastFragmentId); // fragment id
212  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
213  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
214  if (varLenColInfoIt != varLenColInfo_.end()) {
215  varLenColInfoIt->second = colIt->second.getBuffer()->size();
216  }
217  }
218  }
219 }
220 
221 void InsertOrderFragmenter::dropFragmentsToSize(const size_t maxRows) {
222  // not safe to call from outside insertData
223  // b/c depends on insertLock around numTuples_
224 
225  // don't ever drop the only fragment!
226  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
227  return;
228  }
229 
230  if (numTuples_ > maxRows) {
231  size_t preNumTuples = numTuples_;
232  vector<int> dropFragIds;
233  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
234  while (numTuples_ > targetRows) {
235  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
236  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
237  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
238  fragmentInfoVec_.pop_front();
239  CHECK_GE(numTuples_, numFragTuples);
240  numTuples_ -= numFragTuples;
241  }
242  deleteFragments(dropFragIds);
243  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
244  << " post: " << numTuples_ << " maxRows: " << maxRows;
245  }
246 }
247 
248 void InsertOrderFragmenter::deleteFragments(const vector<int>& dropFragIds) {
249  // Fix a verified loophole on sharded logical table which is locked using logical
250  // tableId while it's its physical tables that can come here when fragments overflow
251  // during COPY. Locks on a logical table and its physical tables never intersect, which
252  // means potential races. It'll be an overkill to resolve a logical table to physical
253  // tables in DBHandler, ParseNode or other higher layers where the logical table is
254  // locked with Table Read/Write locks; it's easier to lock the logical table of its
255  // physical tables. A downside of this approach may be loss of parallel execution of
256  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
257  // operation, the loss seems not a big deal.
258  auto chunkKeyPrefix = chunkKeyPrefix_;
259  if (shard_ >= 0) {
260  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
261  }
262 
263  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
264  // SELECT and COPY may enter a deadlock
265  const auto delete_lock =
267 
268  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
269 
270  for (const auto fragId : dropFragIds) {
271  for (const auto& col : columnMap_) {
272  int colId = col.first;
273  vector<int> fragPrefix = chunkKeyPrefix_;
274  fragPrefix.push_back(colId);
275  fragPrefix.push_back(fragId);
276  dataMgr_->deleteChunksWithPrefix(fragPrefix);
277  }
278  }
279 }
280 
281 void InsertOrderFragmenter::updateColumnChunkMetadata(
282  const ColumnDescriptor* cd,
283  const int fragment_id,
284  const std::shared_ptr<ChunkMetadata> metadata) {
285  // synchronize concurrent accesses to fragmentInfoVec_
286  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
287 
288  CHECK(metadata.get());
289  auto fragment_info = getFragmentInfo(fragment_id);
290  CHECK(fragment_info);
291  fragment_info->setChunkMetadata(cd->columnId, metadata);
292 }
293 
294 void InsertOrderFragmenter::updateChunkStats(
295  const ColumnDescriptor* cd,
296  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map) {
297  // synchronize concurrent accesses to fragmentInfoVec_
298  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
304  if (shard_ >= 0) {
305  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
306  }
307 
308  CHECK(cd);
309  const auto column_id = cd->columnId;
310  const auto col_itr = columnMap_.find(column_id);
311  CHECK(col_itr != columnMap_.end());
312 
313  for (auto const& fragment : fragmentInfoVec_) {
314  auto stats_itr = stats_map.find(fragment->fragmentId);
315  if (stats_itr != stats_map.end()) {
316  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
317  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
318  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
319  physicalTableId_,
320  column_id,
321  fragment->fragmentId};
322  auto chunk = Chunk_NS::Chunk::getChunk(cd,
323  &catalog_->getDataMgr(),
324  chunk_key,
325  defaultInsertLevel_,
326  0,
327  chunk_meta_it->second->numBytes,
328  chunk_meta_it->second->numElements);
329  auto buf = chunk->getBuffer();
330  CHECK(buf);
331  if (!buf->hasEncoder()) {
332  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
333  }
334  auto encoder = buf->getEncoder();
335 
336  auto chunk_stats = stats_itr->second;
337 
338  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
339  encoder->getMetadata(old_chunk_metadata);
340  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
341 
342  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
343  // Use the logical type to display data, since the encoding should be ignored
344  const auto logical_ti = cd->columnType.is_dict_encoded_string()
347  if (!didResetStats) {
348  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
349  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
350  << DatumToString(chunk_stats.max, logical_ti);
351  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
352  << DatumToString(chunk_stats.min, logical_ti);
353  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
354  continue; // move to next fragment
355  }
356 
357  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
358  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
359  << DatumToString(chunk_stats.max, logical_ti);
360  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
361  << DatumToString(chunk_stats.min, logical_ti);
362  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
363 
364  // Reset fragment metadata map and set buffer to dirty
365  auto new_metadata = std::make_shared<ChunkMetadata>();
366  // Run through fillChunkStats to ensure any transformations to the raw metadata
367  // values get applied (e.g. for date in days)
368  encoder->getMetadata(new_metadata);
369 
370  fragment->setChunkMetadata(column_id, new_metadata);
371  fragment->shadowChunkMetadataMap =
372  fragment->getChunkMetadataMap(); // TODO(adb): needed?
373  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
374  buf->setDirty();
375  }
376  } else {
377  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
378  << ", table " << physicalTableId_ << ", "
379  << ", column " << column_id;
380  }
381  }
382 }
383 
384 FragmentInfo* InsertOrderFragmenter::getFragmentInfo(const int fragment_id) const {
385  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
386  fragmentInfoVec_.end(),
387  [fragment_id](const auto& fragment) -> bool {
388  return fragment->fragmentId == fragment_id;
389  });
390  CHECK(fragment_it != fragmentInfoVec_.end());
391  return fragment_it->get();
392 }
393 
394 void InsertOrderFragmenter::insertData(InsertData& insertDataStruct) {
395  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
396  try {
397  mapd_unique_lock<mapd_shared_mutex> insertLock(
398  insertMutex_); // prevent two threads from trying to insert into the same table
399  // simultaneously
400 
401  insertDataImpl(insertDataStruct);
402 
403  if (defaultInsertLevel_ ==
404  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
405  dataMgr_->checkpoint(
406  chunkKeyPrefix_[0],
407  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
408  }
409  } catch (...) {
410  auto table_epochs =
411  catalog_->getTableEpochs(insertDataStruct.databaseId, insertDataStruct.tableId);
412 
413  // the statement below deletes *this* object!
414  // relying on exception propagation at this stage
415  // until we can sort this out in a cleaner fashion
416  catalog_->setTableEpochs(insertDataStruct.databaseId, table_epochs);
417  throw;
418  }
419 }
420 
421 void InsertOrderFragmenter::insertDataNoCheckpoint(InsertData& insertDataStruct) {
422  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
423  mapd_unique_lock<mapd_shared_mutex> insertLock(
424  insertMutex_); // prevent two threads from trying to insert into the same table
425  // simultaneously
426  insertDataImpl(insertDataStruct);
427 }
428 
429 void InsertOrderFragmenter::replicateData(const InsertData& insertDataStruct) {
430  // synchronize concurrent accesses to fragmentInfoVec_
431  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
432  size_t numRowsLeft = insertDataStruct.numRows;
433  for (auto const& fragmentInfo : fragmentInfoVec_) {
434  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
435  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
436  size_t numRowsCanBeInserted;
437  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
438  if (insertDataStruct.bypass[i]) {
439  continue;
440  }
441  auto columnId = insertDataStruct.columnIds[i];
442  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
443  CHECK(colDesc);
444  CHECK(columnMap_.find(columnId) != columnMap_.end());
445 
446  ChunkKey chunkKey = chunkKeyPrefix_;
447  chunkKey.push_back(columnId);
448  chunkKey.push_back(fragmentInfo->fragmentId);
449 
450  auto colMapIt = columnMap_.find(columnId);
451  auto& chunk = colMapIt->second;
452  if (chunk.isChunkOnDevice(
453  dataMgr_,
454  chunkKey,
455  defaultInsertLevel_,
456  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
457  dataMgr_->deleteChunksWithPrefix(chunkKey);
458  }
459  chunk.createChunkBuffer(
460  dataMgr_,
461  chunkKey,
462  defaultInsertLevel_,
463  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
464  chunk.initEncoder();
465 
466  try {
467  DataBlockPtr dataCopy = insertDataStruct.data[i];
468  auto size = colDesc->columnType.get_size();
469  if (0 > size) {
470  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
471  varLenColInfo_[columnId] = 0;
472  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
473  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
474  } else {
475  numRowsCanBeInserted = maxChunkSize_ / size;
476  }
477 
478  // FIXME: abort a case in which new column is wider than existing columns
479  if (numRowsCanBeInserted < numRowsToInsert) {
480  throw std::runtime_error("new column '" + colDesc->columnName +
481  "' wider than existing columns is not supported");
482  }
483 
484  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
485  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
486 
487  // update total size of var-len column in (actually the last) fragment
488  if (0 > size) {
489  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
490  varLenColInfo_[columnId] = chunk.getBuffer()->size();
491  }
492  } catch (...) {
493  dataMgr_->deleteChunksWithPrefix(chunkKey);
494  throw;
495  }
496  }
497  numRowsLeft -= numRowsToInsert;
498  }
499  CHECK(0 == numRowsLeft);
500 
501  for (auto const& fragmentInfo : fragmentInfoVec_) {
502  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
503  }
504 }
505 
506 void InsertOrderFragmenter::dropColumns(const std::vector<int>& columnIds) {
507  // prevent concurrent insert rows and drop column
508  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
509  // synchronize concurrent accesses to fragmentInfoVec_
510  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
511  for (auto const& fragmentInfo : fragmentInfoVec_) {
512  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
513  }
514 
515  for (const auto columnId : columnIds) {
516  auto cit = columnMap_.find(columnId);
517  if (columnMap_.end() != cit) {
518  columnMap_.erase(cit);
519  }
520 
521  vector<int> fragPrefix = chunkKeyPrefix_;
522  fragPrefix.push_back(columnId);
523  dataMgr_->deleteChunksWithPrefix(fragPrefix);
524 
525  for (const auto& fragmentInfo : fragmentInfoVec_) {
526  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
527  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
528  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
529  }
530  }
531  }
532  for (const auto& fragmentInfo : fragmentInfoVec_) {
533  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
534  }
535 }
536 
537 bool InsertOrderFragmenter::hasDeletedRows(const int delete_column_id) {
538  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
539 
540  for (auto const& fragment : fragmentInfoVec_) {
541  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
542  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
543  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
544  if (chunk_stats.max.tinyintval == 1) {
545  return true;
546  }
547  }
548  return false;
549 }
550 
551 void InsertOrderFragmenter::insertDataImpl(InsertData& insertDataStruct) {
552  // populate deleted system column if it should exists, as it will not come from client
553  // Do not add this magical column in the replicate ALTER TABLE ADD route as
554  // it is not needed and will cause issues
555  std::unique_ptr<int8_t[]> data_for_deleted_column;
556  for (const auto& cit : columnMap_) {
557  if (cit.second.getColumnDesc()->isDeletedCol &&
558  insertDataStruct.replicate_count == 0) {
559  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
560  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
561  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
562  insertDataStruct.columnIds.push_back(cit.second.getColumnDesc()->columnId);
563  break;
564  }
565  }
566  // MAT we need to add a removal of the empty column we pushed onto here
567  // for upstream safety. Should not be a problem but need to fix.
568 
569  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
570  for (const auto columnId : insertDataStruct.columnIds) {
571  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
572  CHECK(columnDesc);
573  if (columnMap_.end() == columnMap_.find(columnId)) {
574  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
575  }
576  }
577 
578  // when replicate (add) column(s), this path seems wont work; go separate route...
579  if (insertDataStruct.replicate_count > 0) {
580  replicateData(insertDataStruct);
581  return;
582  }
583 
584  std::unordered_map<int, int> inverseInsertDataColIdMap;
585  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
586  inverseInsertDataColIdMap.insert(
587  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
588  }
589 
590  size_t numRowsLeft = insertDataStruct.numRows;
591  size_t numRowsInserted = 0;
592  vector<DataBlockPtr> dataCopy =
593  insertDataStruct.data; // bc append data will move ptr forward and this violates
594  // constness of InsertData
595  if (numRowsLeft <= 0) {
596  return;
597  }
598 
599  FragmentInfo* currentFragment{nullptr};
600 
601  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
602  currentFragment = createNewFragment(defaultInsertLevel_);
603  } else {
604  currentFragment = fragmentInfoVec_.back().get();
605  }
606  CHECK(currentFragment);
607 
608  size_t startFragment = fragmentInfoVec_.size() - 1;
609 
610  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
611  // loop until done inserting all rows
612  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
613  size_t rowsLeftInCurrentFragment =
614  maxFragmentRows_ - currentFragment->shadowNumTuples;
615  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
616  if (rowsLeftInCurrentFragment != 0) {
617  for (auto& varLenColInfoIt : varLenColInfo_) {
618  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
619  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
620  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
621  if (insertIdIt != inverseInsertDataColIdMap.end()) {
622  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
623  numRowsToInsert = std::min(
624  numRowsToInsert,
625  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
626  numRowsToInsert,
627  numRowsInserted,
628  bytesLeft));
629  }
630  }
631  }
632 
633  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
634  currentFragment = createNewFragment(defaultInsertLevel_);
635  if (numRowsInserted == 0) {
636  startFragment++;
637  }
638  rowsLeftInCurrentFragment = maxFragmentRows_;
639  for (auto& varLenColInfoIt : varLenColInfo_) {
640  varLenColInfoIt.second = 0; // reset byte counter
641  }
642  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
643  for (auto& varLenColInfoIt : varLenColInfo_) {
644  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
645  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
646  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
647  if (insertIdIt != inverseInsertDataColIdMap.end()) {
648  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
649  numRowsToInsert = std::min(
650  numRowsToInsert,
651  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
652  numRowsToInsert,
653  numRowsInserted,
654  bytesLeft));
655  }
656  }
657  }
658 
659  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
660  // never be able to insert anything
661 
662  {
663  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
664 
665  // for each column, append the data in the appropriate insert buffer
666  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
667  int columnId = insertDataStruct.columnIds[i];
668  auto colMapIt = columnMap_.find(columnId);
669  CHECK(colMapIt != columnMap_.end());
670  currentFragment->shadowChunkMetadataMap[columnId] =
671  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
672  auto varLenColInfoIt = varLenColInfo_.find(columnId);
673  if (varLenColInfoIt != varLenColInfo_.end()) {
674  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
675  }
676  }
677  if (hasMaterializedRowId_) {
678  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
679  currentFragment->shadowNumTuples;
680  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
681  for (size_t i = 0; i < numRowsToInsert; ++i) {
682  row_id_data[i] = i + startId;
683  }
684  DataBlockPtr rowIdBlock;
685  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
686  auto colMapIt = columnMap_.find(rowIdColId_);
687  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
688  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
689  }
690 
691  currentFragment->shadowNumTuples =
692  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
693  numRowsLeft -= numRowsToInsert;
694  numRowsInserted += numRowsToInsert;
695  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
696  partIt != fragmentInfoVec_.end();
697  ++partIt) {
698  auto fragment_ptr = partIt->get();
699  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
700  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
701  }
702  }
703  }
704  numTuples_ += insertDataStruct.numRows;
705  dropFragmentsToSize(maxRows_);
706 }
707 
708 FragmentInfo* InsertOrderFragmenter::createNewFragment(
709  const Data_Namespace::MemoryLevel memoryLevel) {
710  // also sets the new fragment as the insertBuffer for each column
711 
712  maxFragmentId_++;
713  auto newFragmentInfo = std::make_unique<FragmentInfo>();
714  newFragmentInfo->fragmentId = maxFragmentId_;
715  newFragmentInfo->shadowNumTuples = 0;
716  newFragmentInfo->setPhysicalNumTuples(0);
717  for (const auto levelSize : dataMgr_->levelSizes_) {
718  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
719  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
720  }
721  newFragmentInfo->physicalTableId = physicalTableId_;
722  newFragmentInfo->shard = shard_;
723 
724  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
725  colMapIt != columnMap_.end();
726  ++colMapIt) {
727  ChunkKey chunkKey = chunkKeyPrefix_;
728  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
729  chunkKey.push_back(maxFragmentId_);
730  colMapIt->second.createChunkBuffer(
731  dataMgr_,
732  chunkKey,
733  memoryLevel,
734  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
735  pageSize_);
736  colMapIt->second.initEncoder();
737  }
738 
739  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
740  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
741  return fragmentInfoVec_.back().get();
742 }
743 
744 TableInfo InsertOrderFragmenter::getFragmentsForQuery() {
745  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
746  TableInfo queryInfo;
747  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
748  // right now we don't test predicate, so just return (copy of) all fragments
749  bool fragmentsExist = false;
750  if (fragmentInfoVec_.empty()) {
751  // If we have no fragments add a dummy empty fragment to make the executor
752  // not have separate logic for 0-row tables
753  int maxFragmentId = 0;
754  FragmentInfo emptyFragmentInfo;
755  emptyFragmentInfo.fragmentId = maxFragmentId;
756  emptyFragmentInfo.shadowNumTuples = 0;
757  emptyFragmentInfo.setPhysicalNumTuples(0);
758  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
759  emptyFragmentInfo.physicalTableId = physicalTableId_;
760  emptyFragmentInfo.shard = shard_;
761  queryInfo.fragments.push_back(emptyFragmentInfo);
762  } else {
763  fragmentsExist = true;
764  std::for_each(
765  fragmentInfoVec_.begin(),
766  fragmentInfoVec_.end(),
767  [&queryInfo](const auto& fragment_owned_ptr) {
768  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
769  });
770  }
771  readLock.unlock();
772  queryInfo.setPhysicalNumTuples(0);
773  auto partIt = queryInfo.fragments.begin();
774  if (fragmentsExist) {
775  while (partIt != queryInfo.fragments.end()) {
776  if (partIt->getPhysicalNumTuples() == 0) {
777  // this means that a concurrent insert query inserted tuples into a new fragment
778  // but when the query came in we didn't have this fragment. To make sure we don't
779  // mess up the executor we delete this fragment from the metadatamap (fixes
780  // earlier bug found 2015-05-08)
781  partIt = queryInfo.fragments.erase(partIt);
782  } else {
783  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
784  partIt->getPhysicalNumTuples());
785  ++partIt;
786  }
787  }
788  } else {
789  // We added a dummy fragment and know the table is empty
790  queryInfo.setPhysicalNumTuples(0);
791  }
792  return queryInfo;
793 }
794 
795 } // namespace Fragmenter_Namespace
int32_t maxRollbackEpochs
catalog_(nullptr)
bool g_use_table_device_offset
std::vector< int > ChunkKey
Definition: types.h:37
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:240
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
#define LOG(tag)
Definition: Logger.h:188
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:210
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:893
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:160
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string to_string(char const *&&v)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:154
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
#define DEFAULT_MAX_ROLLBACK_EPOCHS
specifies the content in-memory of a row in the table metadata table
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
mapd_shared_lock< mapd_shared_mutex > read_lock
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:197
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:156
#define DROP_FRAGMENT_FACTOR
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:108
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:70
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool is_dict_encoded_string() const
Definition: sqltypes.h:512
SQLTypeInfo columnType
int8_t * numbersPtr
Definition: sqltypes.h:217
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
#define VLOG(n)
Definition: Logger.h:291
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)