OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
InsertOrderFragmenter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/lexical_cast.hpp>
21 #include <cassert>
22 #include <cmath>
23 #include <iostream>
24 #include <limits>
25 #include <memory>
26 #include <thread>
27 #include <type_traits>
28 
29 #include "DataMgr/AbstractBuffer.h"
30 #include "DataMgr/DataMgr.h"
31 #include "LockMgr/LockMgr.h"
32 #include "Logger/Logger.h"
33 #include "Shared/checked_alloc.h"
34 #include "Shared/thread_count.h"
35 
36 #define DROP_FRAGMENT_FACTOR \
37  0.97 // drop to 97% of max so we don't keep adding and dropping fragments
38 
39 using Chunk_NS::Chunk;
42 
44 
45 using namespace std;
46 
47 namespace Fragmenter_Namespace {
48 
49 InsertOrderFragmenter::InsertOrderFragmenter(
50  const vector<int> chunkKeyPrefix,
51  vector<Chunk>& chunkVec,
52  Data_Namespace::DataMgr* dataMgr,
54  const int physicalTableId,
55  const int shard,
56  const size_t maxFragmentRows,
57  const size_t maxChunkSize,
58  const size_t pageSize,
59  const size_t maxRows,
60  const Data_Namespace::MemoryLevel defaultInsertLevel,
61  const bool uses_foreign_storage)
62  : chunkKeyPrefix_(chunkKeyPrefix)
63  , dataMgr_(dataMgr)
64  , catalog_(catalog)
65  , physicalTableId_(physicalTableId)
66  , shard_(shard)
67  , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
68  , pageSize_(pageSize)
69  , numTuples_(0)
70  , maxFragmentId_(-1)
71  , maxChunkSize_(maxChunkSize)
72  , maxRows_(maxRows)
73  , fragmenterType_("insert_order")
74  , defaultInsertLevel_(defaultInsertLevel)
75  , uses_foreign_storage_(uses_foreign_storage)
76  , hasMaterializedRowId_(false)
77  , mutex_access_inmem_states(new std::mutex) {
78  // Note that Fragmenter is not passed virtual columns and so should only
79  // find row id column if it is non virtual
80 
81  for (auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
82  int columnId = colIt->getColumnDesc()->columnId;
83  columnMap_[columnId] = *colIt;
84  if (colIt->getColumnDesc()->columnName == "rowid") {
85  hasMaterializedRowId_ = true;
86  rowIdColId_ = columnId;
87  }
88  }
89  getChunkMetadata();
90 }
91 
92 InsertOrderFragmenter::~InsertOrderFragmenter() {}
93 
94 namespace {
95 
100 int compute_device_for_fragment(const int table_id,
101  const int fragment_id,
102  const int num_devices) {
104  return (table_id + fragment_id) % num_devices;
105  } else {
106  return fragment_id % num_devices;
107  }
108 }
109 
110 } // namespace
111 
112 void InsertOrderFragmenter::getChunkMetadata() {
113  if (uses_foreign_storage_ ||
114  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
115  // memory-resident tables won't have anything on disk
116  ChunkMetadataVector chunk_metadata;
117  dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
118 
119  // data comes like this - database_id, table_id, column_id, fragment_id
120  // but lets sort by database_id, table_id, fragment_id, column_id
121 
122  int fragment_subkey_index = 3;
123  std::sort(chunk_metadata.begin(),
124  chunk_metadata.end(),
125  [&](const auto& pair1, const auto& pair2) {
126  return pair1.first[3] < pair2.first[3];
127  });
128 
129  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
130  ++chunk_itr) {
131  int cur_column_id = chunk_itr->first[2];
132  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
133 
134  if (fragmentInfoVec_.empty() ||
135  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
136  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
137  CHECK(new_fragment_info);
138  maxFragmentId_ = cur_fragment_id;
139  new_fragment_info->fragmentId = cur_fragment_id;
140  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
141  numTuples_ += new_fragment_info->getPhysicalNumTuples();
142  for (const auto level_size : dataMgr_->levelSizes_) {
143  new_fragment_info->deviceIds.push_back(
144  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
145  }
146  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
147  new_fragment_info->physicalTableId = physicalTableId_;
148  new_fragment_info->shard = shard_;
149  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
150  } else {
151  if (chunk_itr->second->numElements !=
152  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
153  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
154  std::to_string(physicalTableId_) + ", Column " +
155  std::to_string(cur_column_id) + ". Fragment Tuples: " +
157  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
158  ", Chunk Tuples: " +
159  std::to_string(chunk_itr->second->numElements);
160  }
161  }
162  CHECK(fragmentInfoVec_.back().get());
163  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
164  }
165  }
166 
167  size_t maxFixedColSize = 0;
168 
169  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
170  auto size = colIt->second.getColumnDesc()->columnType.get_size();
171  if (size == -1) { // variable length
172  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
173  size = 8; // b/c we use this for string and array indices - gross to have magic
174  // number here
175  }
176  CHECK_GE(size, 0);
177  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
178  }
179 
180  // this is maximum number of rows assuming everything is fixed length
181  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
182 
183  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
184  // Now need to get the insert buffers for each column - should be last
185  // fragment
186  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
187  // TODO: add accessor here for safe indexing
188  int deviceId =
189  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
190  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
191  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
192  insertKey.push_back(colIt->first); // column id
193  insertKey.push_back(lastFragmentId); // fragment id
194  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
195  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
196  if (varLenColInfoIt != varLenColInfo_.end()) {
197  varLenColInfoIt->second = colIt->second.getBuffer()->size();
198  }
199  }
200  }
201 }
202 
203 void InsertOrderFragmenter::dropFragmentsToSize(const size_t maxRows) {
204  // not safe to call from outside insertData
205  // b/c depends on insertLock around numTuples_
206 
207  // don't ever drop the only fragment!
208  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
209  return;
210  }
211 
212  if (numTuples_ > maxRows) {
213  size_t preNumTuples = numTuples_;
214  vector<int> dropFragIds;
215  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
216  while (numTuples_ > targetRows) {
217  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
218  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
219  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
220  fragmentInfoVec_.pop_front();
221  CHECK_GE(numTuples_, numFragTuples);
222  numTuples_ -= numFragTuples;
223  }
224  deleteFragments(dropFragIds);
225  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
226  << " post: " << numTuples_ << " maxRows: " << maxRows;
227  }
228 }
229 
230 void InsertOrderFragmenter::deleteFragments(const vector<int>& dropFragIds) {
231  // Fix a verified loophole on sharded logical table which is locked using logical
232  // tableId while it's its physical tables that can come here when fragments overflow
233  // during COPY. Locks on a logical table and its physical tables never intersect, which
234  // means potential races. It'll be an overkill to resolve a logical table to physical
235  // tables in DBHandler, ParseNode or other higher layers where the logical table is
236  // locked with Table Read/Write locks; it's easier to lock the logical table of its
237  // physical tables. A downside of this approach may be loss of parallel execution of
238  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
239  // operation, the loss seems not a big deal.
240  auto chunkKeyPrefix = chunkKeyPrefix_;
241  if (shard_ >= 0) {
242  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
243  }
244 
245  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
246  // SELECT and COPY may enter a deadlock
247  const auto delete_lock =
249 
250  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
251 
252  for (const auto fragId : dropFragIds) {
253  for (const auto& col : columnMap_) {
254  int colId = col.first;
255  vector<int> fragPrefix = chunkKeyPrefix_;
256  fragPrefix.push_back(colId);
257  fragPrefix.push_back(fragId);
258  dataMgr_->deleteChunksWithPrefix(fragPrefix);
259  }
260  }
261 }
262 
263 void InsertOrderFragmenter::updateColumnChunkMetadata(
264  const ColumnDescriptor* cd,
265  const int fragment_id,
266  const std::shared_ptr<ChunkMetadata> metadata) {
267  // synchronize concurrent accesses to fragmentInfoVec_
268  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
269 
270  CHECK(metadata.get());
271  auto fragment_info = getFragmentInfo(fragment_id);
272  CHECK(fragment_info);
273  fragment_info->setChunkMetadata(cd->columnId, metadata);
274 }
275 
276 void InsertOrderFragmenter::updateChunkStats(
277  const ColumnDescriptor* cd,
278  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map) {
279  // synchronize concurrent accesses to fragmentInfoVec_
280  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
286  if (shard_ >= 0) {
287  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
288  }
289 
290  CHECK(cd);
291  const auto column_id = cd->columnId;
292  const auto col_itr = columnMap_.find(column_id);
293  CHECK(col_itr != columnMap_.end());
294 
295  for (auto const& fragment : fragmentInfoVec_) {
296  auto stats_itr = stats_map.find(fragment->fragmentId);
297  if (stats_itr != stats_map.end()) {
298  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
299  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
300  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
301  physicalTableId_,
302  column_id,
303  fragment->fragmentId};
304  auto chunk = Chunk_NS::Chunk::getChunk(cd,
305  &catalog_->getDataMgr(),
306  chunk_key,
307  defaultInsertLevel_,
308  0,
309  chunk_meta_it->second->numBytes,
310  chunk_meta_it->second->numElements);
311  auto buf = chunk->getBuffer();
312  CHECK(buf);
313  if (!buf->hasEncoder()) {
314  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
315  }
316  auto encoder = buf->getEncoder();
317 
318  auto chunk_stats = stats_itr->second;
319 
320  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
321  encoder->getMetadata(old_chunk_metadata);
322  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
323 
324  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
325  // Use the logical type to display data, since the encoding should be ignored
326  const auto logical_ti = cd->columnType.is_dict_encoded_string()
329  if (!didResetStats) {
330  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
331  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
332  << DatumToString(chunk_stats.max, logical_ti);
333  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
334  << DatumToString(chunk_stats.min, logical_ti);
335  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
336  continue; // move to next fragment
337  }
338 
339  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
340  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
341  << DatumToString(chunk_stats.max, logical_ti);
342  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
343  << DatumToString(chunk_stats.min, logical_ti);
344  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
345 
346  // Reset fragment metadata map and set buffer to dirty
347  auto new_metadata = std::make_shared<ChunkMetadata>();
348  // Run through fillChunkStats to ensure any transformations to the raw metadata
349  // values get applied (e.g. for date in days)
350  encoder->getMetadata(new_metadata);
351 
352  fragment->setChunkMetadata(column_id, new_metadata);
353  fragment->shadowChunkMetadataMap =
354  fragment->getChunkMetadataMap(); // TODO(adb): needed?
355  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
356  buf->setDirty();
357  }
358  } else {
359  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
360  << ", table " << physicalTableId_ << ", "
361  << ", column " << column_id;
362  }
363  }
364 }
365 
366 FragmentInfo* InsertOrderFragmenter::getFragmentInfo(const int fragment_id) const {
367  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
368  fragmentInfoVec_.end(),
369  [fragment_id](const auto& fragment) -> bool {
370  return fragment->fragmentId == fragment_id;
371  });
372  CHECK(fragment_it != fragmentInfoVec_.end());
373  return fragment_it->get();
374 }
375 
376 void InsertOrderFragmenter::insertData(InsertData& insertDataStruct) {
377  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
378  try {
379  mapd_unique_lock<mapd_shared_mutex> insertLock(
380  insertMutex_); // prevent two threads from trying to insert into the same table
381  // simultaneously
382 
383  insertDataImpl(insertDataStruct);
384 
385  if (defaultInsertLevel_ ==
386  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
387  dataMgr_->checkpoint(
388  chunkKeyPrefix_[0],
389  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
390  }
391  } catch (...) {
392  auto table_epochs =
393  catalog_->getTableEpochs(insertDataStruct.databaseId, insertDataStruct.tableId);
394 
395  // the statement below deletes *this* object!
396  // relying on exception propagation at this stage
397  // until we can sort this out in a cleaner fashion
398  catalog_->setTableEpochs(insertDataStruct.databaseId, table_epochs);
399  throw;
400  }
401 }
402 
403 void InsertOrderFragmenter::insertDataNoCheckpoint(InsertData& insertDataStruct) {
404  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
405  mapd_unique_lock<mapd_shared_mutex> insertLock(
406  insertMutex_); // prevent two threads from trying to insert into the same table
407  // simultaneously
408  insertDataImpl(insertDataStruct);
409 }
410 
411 void InsertOrderFragmenter::replicateData(const InsertData& insertDataStruct) {
412  // synchronize concurrent accesses to fragmentInfoVec_
413  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
414  size_t numRowsLeft = insertDataStruct.numRows;
415  for (auto const& fragmentInfo : fragmentInfoVec_) {
416  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
417  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
418  size_t numRowsCanBeInserted;
419  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
420  if (insertDataStruct.bypass[i]) {
421  continue;
422  }
423  auto columnId = insertDataStruct.columnIds[i];
424  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
425  CHECK(colDesc);
426  CHECK(columnMap_.find(columnId) != columnMap_.end());
427 
428  ChunkKey chunkKey = chunkKeyPrefix_;
429  chunkKey.push_back(columnId);
430  chunkKey.push_back(fragmentInfo->fragmentId);
431 
432  auto colMapIt = columnMap_.find(columnId);
433  auto& chunk = colMapIt->second;
434  if (chunk.isChunkOnDevice(
435  dataMgr_,
436  chunkKey,
437  defaultInsertLevel_,
438  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
439  dataMgr_->deleteChunksWithPrefix(chunkKey);
440  }
441  chunk.createChunkBuffer(
442  dataMgr_,
443  chunkKey,
444  defaultInsertLevel_,
445  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
446  chunk.initEncoder();
447 
448  try {
449  DataBlockPtr dataCopy = insertDataStruct.data[i];
450  auto size = colDesc->columnType.get_size();
451  if (0 > size) {
452  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
453  varLenColInfo_[columnId] = 0;
454  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
455  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
456  } else {
457  numRowsCanBeInserted = maxChunkSize_ / size;
458  }
459 
460  // FIXME: abort a case in which new column is wider than existing columns
461  if (numRowsCanBeInserted < numRowsToInsert) {
462  throw std::runtime_error("new column '" + colDesc->columnName +
463  "' wider than existing columns is not supported");
464  }
465 
466  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
467  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
468 
469  // update total size of var-len column in (actually the last) fragment
470  if (0 > size) {
471  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
472  varLenColInfo_[columnId] = chunk.getBuffer()->size();
473  }
474  } catch (...) {
475  dataMgr_->deleteChunksWithPrefix(chunkKey);
476  throw;
477  }
478  }
479  numRowsLeft -= numRowsToInsert;
480  }
481  CHECK(0 == numRowsLeft);
482 
483  for (auto const& fragmentInfo : fragmentInfoVec_) {
484  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
485  }
486 }
487 
488 void InsertOrderFragmenter::dropColumns(const std::vector<int>& columnIds) {
489  // prevent concurrent insert rows and drop column
490  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
491  // synchronize concurrent accesses to fragmentInfoVec_
492  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
493  for (auto const& fragmentInfo : fragmentInfoVec_) {
494  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
495  }
496 
497  for (const auto columnId : columnIds) {
498  auto cit = columnMap_.find(columnId);
499  if (columnMap_.end() != cit) {
500  columnMap_.erase(cit);
501  }
502 
503  vector<int> fragPrefix = chunkKeyPrefix_;
504  fragPrefix.push_back(columnId);
505  dataMgr_->deleteChunksWithPrefix(fragPrefix);
506 
507  for (const auto& fragmentInfo : fragmentInfoVec_) {
508  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
509  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
510  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
511  }
512  }
513  }
514  for (const auto& fragmentInfo : fragmentInfoVec_) {
515  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
516  }
517 }
518 
519 bool InsertOrderFragmenter::hasDeletedRows(const int delete_column_id) {
520  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
521 
522  for (auto const& fragment : fragmentInfoVec_) {
523  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
524  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
525  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
526  if (chunk_stats.max.tinyintval == 1) {
527  return true;
528  }
529  }
530  return false;
531 }
532 
533 void InsertOrderFragmenter::insertDataImpl(InsertData& insertDataStruct) {
534  // populate deleted system column if it should exists, as it will not come from client
535  // Do not add this magical column in the replicate ALTER TABLE ADD route as
536  // it is not needed and will cause issues
537  std::unique_ptr<int8_t[]> data_for_deleted_column;
538  for (const auto& cit : columnMap_) {
539  if (cit.second.getColumnDesc()->isDeletedCol &&
540  insertDataStruct.replicate_count == 0) {
541  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
542  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
543  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
544  insertDataStruct.columnIds.push_back(cit.second.getColumnDesc()->columnId);
545  break;
546  }
547  }
548  // MAT we need to add a removal of the empty column we pushed onto here
549  // for upstream safety. Should not be a problem but need to fix.
550 
551  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
552  for (const auto columnId : insertDataStruct.columnIds) {
553  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
554  CHECK(columnDesc);
555  if (columnMap_.end() == columnMap_.find(columnId)) {
556  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
557  }
558  }
559 
560  // when replicate (add) column(s), this path seems wont work; go separate route...
561  if (insertDataStruct.replicate_count > 0) {
562  replicateData(insertDataStruct);
563  return;
564  }
565 
566  std::unordered_map<int, int> inverseInsertDataColIdMap;
567  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
568  inverseInsertDataColIdMap.insert(
569  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
570  }
571 
572  size_t numRowsLeft = insertDataStruct.numRows;
573  size_t numRowsInserted = 0;
574  vector<DataBlockPtr> dataCopy =
575  insertDataStruct.data; // bc append data will move ptr forward and this violates
576  // constness of InsertData
577  if (numRowsLeft <= 0) {
578  return;
579  }
580 
581  FragmentInfo* currentFragment{nullptr};
582 
583  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
584  currentFragment = createNewFragment(defaultInsertLevel_);
585  } else {
586  currentFragment = fragmentInfoVec_.back().get();
587  }
588  CHECK(currentFragment);
589 
590  size_t startFragment = fragmentInfoVec_.size() - 1;
591 
592  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
593  // loop until done inserting all rows
594  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
595  size_t rowsLeftInCurrentFragment =
596  maxFragmentRows_ - currentFragment->shadowNumTuples;
597  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
598  if (rowsLeftInCurrentFragment != 0) {
599  for (auto& varLenColInfoIt : varLenColInfo_) {
600  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
601  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
602  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
603  if (insertIdIt != inverseInsertDataColIdMap.end()) {
604  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
605  numRowsToInsert = std::min(
606  numRowsToInsert,
607  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
608  numRowsToInsert,
609  numRowsInserted,
610  bytesLeft));
611  }
612  }
613  }
614 
615  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
616  currentFragment = createNewFragment(defaultInsertLevel_);
617  if (numRowsInserted == 0) {
618  startFragment++;
619  }
620  rowsLeftInCurrentFragment = maxFragmentRows_;
621  for (auto& varLenColInfoIt : varLenColInfo_) {
622  varLenColInfoIt.second = 0; // reset byte counter
623  }
624  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
625  for (auto& varLenColInfoIt : varLenColInfo_) {
626  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
627  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
628  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
629  if (insertIdIt != inverseInsertDataColIdMap.end()) {
630  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
631  numRowsToInsert = std::min(
632  numRowsToInsert,
633  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
634  numRowsToInsert,
635  numRowsInserted,
636  bytesLeft));
637  }
638  }
639  }
640 
641  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
642  // never be able to insert anything
643 
644  // for each column, append the data in the appropriate insert buffer
645  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
646  int columnId = insertDataStruct.columnIds[i];
647  auto colMapIt = columnMap_.find(columnId);
648  CHECK(colMapIt != columnMap_.end());
649  currentFragment->shadowChunkMetadataMap[columnId] =
650  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
651  auto varLenColInfoIt = varLenColInfo_.find(columnId);
652  if (varLenColInfoIt != varLenColInfo_.end()) {
653  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
654  }
655  }
656  if (hasMaterializedRowId_) {
657  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
658  currentFragment->shadowNumTuples;
659  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
660  for (size_t i = 0; i < numRowsToInsert; ++i) {
661  row_id_data[i] = i + startId;
662  }
663  DataBlockPtr rowIdBlock;
664  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
665  auto colMapIt = columnMap_.find(rowIdColId_);
666  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
667  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
668  }
669 
670  currentFragment->shadowNumTuples =
671  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
672  numRowsLeft -= numRowsToInsert;
673  numRowsInserted += numRowsToInsert;
674  }
675  {
676  // Only take the fragment info lock when updating fragment info map. Otherwise,
677  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
678  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
679  // COPY_FROM waits for TableWriteLock
680  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
681  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
682  partIt != fragmentInfoVec_.end();
683  ++partIt) {
684  auto fragment_ptr = partIt->get();
685  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
686  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
687  }
688  }
689  numTuples_ += insertDataStruct.numRows;
690  dropFragmentsToSize(maxRows_);
691 }
692 
693 FragmentInfo* InsertOrderFragmenter::createNewFragment(
694  const Data_Namespace::MemoryLevel memoryLevel) {
695  // also sets the new fragment as the insertBuffer for each column
696 
697  maxFragmentId_++;
698  auto newFragmentInfo = std::make_unique<FragmentInfo>();
699  newFragmentInfo->fragmentId = maxFragmentId_;
700  newFragmentInfo->shadowNumTuples = 0;
701  newFragmentInfo->setPhysicalNumTuples(0);
702  for (const auto levelSize : dataMgr_->levelSizes_) {
703  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
704  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
705  }
706  newFragmentInfo->physicalTableId = physicalTableId_;
707  newFragmentInfo->shard = shard_;
708 
709  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
710  colMapIt != columnMap_.end();
711  ++colMapIt) {
712  ChunkKey chunkKey = chunkKeyPrefix_;
713  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
714  chunkKey.push_back(maxFragmentId_);
715  colMapIt->second.createChunkBuffer(
716  dataMgr_,
717  chunkKey,
718  memoryLevel,
719  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
720  pageSize_);
721  colMapIt->second.initEncoder();
722  }
723 
724  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
725  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
726  return fragmentInfoVec_.back().get();
727 }
728 
729 TableInfo InsertOrderFragmenter::getFragmentsForQuery() {
730  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
731  TableInfo queryInfo;
732  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
733  // right now we don't test predicate, so just return (copy of) all fragments
734  bool fragmentsExist = false;
735  if (fragmentInfoVec_.empty()) {
736  // If we have no fragments add a dummy empty fragment to make the executor
737  // not have separate logic for 0-row tables
738  int maxFragmentId = 0;
739  FragmentInfo emptyFragmentInfo;
740  emptyFragmentInfo.fragmentId = maxFragmentId;
741  emptyFragmentInfo.shadowNumTuples = 0;
742  emptyFragmentInfo.setPhysicalNumTuples(0);
743  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
744  emptyFragmentInfo.physicalTableId = physicalTableId_;
745  emptyFragmentInfo.shard = shard_;
746  queryInfo.fragments.push_back(emptyFragmentInfo);
747  } else {
748  fragmentsExist = true;
749  std::for_each(
750  fragmentInfoVec_.begin(),
751  fragmentInfoVec_.end(),
752  [&queryInfo](const auto& fragment_owned_ptr) {
753  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
754  });
755  }
756  readLock.unlock();
757  queryInfo.setPhysicalNumTuples(0);
758  auto partIt = queryInfo.fragments.begin();
759  if (fragmentsExist) {
760  while (partIt != queryInfo.fragments.end()) {
761  if (partIt->getPhysicalNumTuples() == 0) {
762  // this means that a concurrent insert query inserted tuples into a new fragment
763  // but when the query came in we didn't have this fragment. To make sure we don't
764  // mess up the executor we delete this fragment from the metadatamap (fixes
765  // earlier bug found 2015-05-08)
766  partIt = queryInfo.fragments.erase(partIt);
767  } else {
768  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
769  partIt->getPhysicalNumTuples());
770  ++partIt;
771  }
772  }
773  } else {
774  // We added a dummy fragment and know the table is empty
775  queryInfo.setPhysicalNumTuples(0);
776  }
777  return queryInfo;
778 }
779 
780 } // namespace Fragmenter_Namespace
catalog_(nullptr)
bool g_use_table_device_offset
std::vector< int > ChunkKey
Definition: types.h:37
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:239
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
#define LOG(tag)
Definition: Logger.h:188
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
#define CHECK_GE(x, y)
Definition: Logger.h:210
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:899
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:160
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string to_string(char const *&&v)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:154
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
mapd_shared_lock< mapd_shared_mutex > read_lock
bool g_enable_watchdog false
Definition: Execute.cpp:73
#define CHECK(condition)
Definition: Logger.h:197
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:156
#define DROP_FRAGMENT_FACTOR
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:108
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:67
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool is_dict_encoded_string() const
Definition: sqltypes.h:518
SQLTypeInfo columnType
int8_t * numbersPtr
Definition: sqltypes.h:220
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
#define VLOG(n)
Definition: Logger.h:291
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)