OmniSciDB  7bf56492aa
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
InsertOrderFragmenter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/lexical_cast.hpp>
21 #include <cassert>
22 #include <cmath>
23 #include <iostream>
24 #include <limits>
25 #include <memory>
26 #include <thread>
27 #include <type_traits>
28 
29 #include "DataMgr/AbstractBuffer.h"
30 #include "DataMgr/DataMgr.h"
31 #include "LockMgr/LockMgr.h"
32 #include "Shared/Logger.h"
33 #include "Shared/checked_alloc.h"
34 #include "Shared/thread_count.h"
35 
36 #define DROP_FRAGMENT_FACTOR \
37  0.97 // drop to 97% of max so we don't keep adding and dropping fragments
38 
39 using Chunk_NS::Chunk;
42 
43 using namespace std;
44 
45 namespace Fragmenter_Namespace {
46 
47 InsertOrderFragmenter::InsertOrderFragmenter(
48  const vector<int> chunkKeyPrefix,
49  vector<Chunk>& chunkVec,
50  Data_Namespace::DataMgr* dataMgr,
52  const int physicalTableId,
53  const int shard,
54  const size_t maxFragmentRows,
55  const size_t maxChunkSize,
56  const size_t pageSize,
57  const size_t maxRows,
58  const Data_Namespace::MemoryLevel defaultInsertLevel,
59  const bool uses_foreign_storage)
60  : chunkKeyPrefix_(chunkKeyPrefix)
61  , dataMgr_(dataMgr)
62  , catalog_(catalog)
63  , physicalTableId_(physicalTableId)
64  , shard_(shard)
65  , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
66  , pageSize_(pageSize)
67  , numTuples_(0)
68  , maxFragmentId_(-1)
69  , maxChunkSize_(maxChunkSize)
70  , maxRows_(maxRows)
71  , fragmenterType_("insert_order")
72  , defaultInsertLevel_(defaultInsertLevel)
73  , uses_foreign_storage_(uses_foreign_storage)
74  , hasMaterializedRowId_(false)
75  , mutex_access_inmem_states(new std::mutex) {
76  // Note that Fragmenter is not passed virtual columns and so should only
77  // find row id column if it is non virtual
78 
79  for (auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
80  int columnId = colIt->get_column_desc()->columnId;
81  columnMap_[columnId] = *colIt;
82  if (colIt->get_column_desc()->columnName == "rowid") {
83  hasMaterializedRowId_ = true;
84  rowIdColId_ = columnId;
85  }
86  }
87  getChunkMetadata();
88 }
89 
90 InsertOrderFragmenter::~InsertOrderFragmenter() {}
91 
92 void InsertOrderFragmenter::getChunkMetadata() {
93  if (uses_foreign_storage_ ||
94  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
95  // memory-resident tables won't have anything on disk
96  std::vector<std::pair<ChunkKey, ChunkMetadata>> chunk_metadata;
97  dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
98 
99  // data comes like this - database_id, table_id, column_id, fragment_id
100  // but lets sort by database_id, table_id, fragment_id, column_id
101 
102  int fragment_subkey_index = 3;
103  std::sort(chunk_metadata.begin(),
104  chunk_metadata.end(),
105  [&](const std::pair<ChunkKey, ChunkMetadata>& pair1,
106  const std::pair<ChunkKey, ChunkMetadata>& pair2) {
107  return pair1.first[3] < pair2.first[3];
108  });
109 
110  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
111  ++chunk_itr) {
112  int cur_column_id = chunk_itr->first[2];
113  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
114 
115  if (fragmentInfoVec_.empty() ||
116  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
117  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
118  CHECK(new_fragment_info);
119  maxFragmentId_ = cur_fragment_id;
120  new_fragment_info->fragmentId = cur_fragment_id;
121  new_fragment_info->setPhysicalNumTuples(chunk_itr->second.numElements);
122  numTuples_ += new_fragment_info->getPhysicalNumTuples();
123  for (const auto level_size : dataMgr_->levelSizes_) {
124  new_fragment_info->deviceIds.push_back(cur_fragment_id % level_size);
125  }
126  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
127  new_fragment_info->physicalTableId = physicalTableId_;
128  new_fragment_info->shard = shard_;
129  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
130  } else {
131  if (chunk_itr->second.numElements !=
132  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
133  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
134  std::to_string(physicalTableId_) + ", Column " +
135  std::to_string(cur_column_id) + ". Fragment Tuples: " +
137  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
138  ", Chunk Tuples: " +
139  std::to_string(chunk_itr->second.numElements);
140  }
141  }
142  CHECK(fragmentInfoVec_.back().get());
143  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
144  }
145  }
146 
147  ssize_t maxFixedColSize = 0;
148 
149  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
150  ssize_t size = colIt->second.get_column_desc()->columnType.get_size();
151  if (size == -1) { // variable length
152  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
153  size = 8; // b/c we use this for string and array indices - gross to have magic
154  // number here
155  }
156  maxFixedColSize = std::max(maxFixedColSize, size);
157  }
158 
159  // this is maximum number of rows assuming everything is fixed length
160  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
161 
162  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
163  // Now need to get the insert buffers for each column - should be last
164  // fragment
165  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
166  // TODO: add accessor here for safe indexing
167  int deviceId =
168  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
169  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
170  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
171  insertKey.push_back(colIt->first); // column id
172  insertKey.push_back(lastFragmentId); // fragment id
173  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
174  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
175  if (varLenColInfoIt != varLenColInfo_.end()) {
176  varLenColInfoIt->second = colIt->second.get_buffer()->size();
177  }
178  }
179  }
180 }
181 
182 void InsertOrderFragmenter::dropFragmentsToSize(const size_t maxRows) {
183  // not safe to call from outside insertData
184  // b/c depends on insertLock around numTuples_
185 
186  // don't ever drop the only fragment!
187  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
188  return;
189  }
190 
191  if (numTuples_ > maxRows) {
192  size_t preNumTuples = numTuples_;
193  vector<int> dropFragIds;
194  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
195  while (numTuples_ > targetRows) {
196  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
197  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
198  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
199  fragmentInfoVec_.pop_front();
200  CHECK_GE(numTuples_, numFragTuples);
201  numTuples_ -= numFragTuples;
202  }
203  deleteFragments(dropFragIds);
204  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
205  << " post: " << numTuples_ << " maxRows: " << maxRows;
206  }
207 }
208 
209 void InsertOrderFragmenter::deleteFragments(const vector<int>& dropFragIds) {
210  // Fix a verified loophole on sharded logical table which is locked using logical
211  // tableId while it's its physical tables that can come here when fragments overflow
212  // during COPY. Locks on a logical table and its physical tables never intersect, which
213  // means potential races. It'll be an overkill to resolve a logical table to physical
214  // tables in MapDHandler, ParseNode or other higher layers where the logical table is
215  // locked with Table Read/Write locks; it's easier to lock the logical table of its
216  // physical tables. A downside of this approach may be loss of parallel execution of
217  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
218  // operation, the loss seems not a big deal.
219  auto chunkKeyPrefix = chunkKeyPrefix_;
220  if (shard_ >= 0) {
221  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
222  }
223 
224  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
225  // SELECT and COPY may enter a deadlock
226  const auto delete_lock =
228 
229  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
230 
231  for (const auto fragId : dropFragIds) {
232  for (const auto& col : columnMap_) {
233  int colId = col.first;
234  vector<int> fragPrefix = chunkKeyPrefix_;
235  fragPrefix.push_back(colId);
236  fragPrefix.push_back(fragId);
237  dataMgr_->deleteChunksWithPrefix(fragPrefix);
238  }
239  }
240 }
241 
242 void InsertOrderFragmenter::updateChunkStats(
243  const ColumnDescriptor* cd,
244  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map) {
245  // synchronize concurrent accesses to fragmentInfoVec_
246  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
252  if (shard_ >= 0) {
253  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
254  }
255 
256  CHECK(cd);
257  const auto column_id = cd->columnId;
258  const auto col_itr = columnMap_.find(column_id);
259  CHECK(col_itr != columnMap_.end());
260 
261  for (auto const& fragment : fragmentInfoVec_) {
262  auto stats_itr = stats_map.find(fragment->fragmentId);
263  if (stats_itr != stats_map.end()) {
264  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
265  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
266  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
267  physicalTableId_,
268  column_id,
269  fragment->fragmentId};
270  auto chunk = Chunk_NS::Chunk::getChunk(cd,
271  &catalog_->getDataMgr(),
272  chunk_key,
273  defaultInsertLevel_,
274  0,
275  chunk_meta_it->second.numBytes,
276  chunk_meta_it->second.numElements);
277  auto buf = chunk->get_buffer();
278  CHECK(buf);
279  auto encoder = buf->encoder.get();
280  if (!encoder) {
281  throw std::runtime_error("No encoder for chunk " + showChunk(chunk_key));
282  }
283 
284  auto chunk_stats = stats_itr->second;
285 
286  ChunkMetadata old_chunk_metadata;
287  encoder->getMetadata(old_chunk_metadata);
288  auto& old_chunk_stats = old_chunk_metadata.chunkStats;
289 
290  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
291  // Use the logical type to display data, since the encoding should be ignored
292  const auto logical_ti = cd->columnType.is_dict_encoded_string()
295  if (!didResetStats) {
296  VLOG(3) << "Skipping chunk stats reset for " << showChunk(chunk_key);
297  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
298  << DatumToString(chunk_stats.max, logical_ti);
299  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
300  << DatumToString(chunk_stats.min, logical_ti);
301  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
302  continue; // move to next fragment
303  }
304 
305  VLOG(2) << "Resetting chunk stats for " << showChunk(chunk_key);
306  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
307  << DatumToString(chunk_stats.max, logical_ti);
308  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
309  << DatumToString(chunk_stats.min, logical_ti);
310  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
311 
312  // Reset fragment metadata map and set buffer to dirty
313  ChunkMetadata new_metadata;
314  // Run through fillChunkStats to ensure any transformations to the raw metadata
315  // values get applied (e.g. for date in days)
316  encoder->getMetadata(new_metadata);
317 
318  fragment->setChunkMetadata(column_id, new_metadata);
319  fragment->shadowChunkMetadataMap =
320  fragment->getChunkMetadataMap(); // TODO(adb): needed?
321  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
322  buf->setDirty();
323  }
324  } else {
325  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
326  << ", table " << physicalTableId_ << ", "
327  << ", column " << column_id;
328  }
329  }
330 }
331 
332 FragmentInfo* InsertOrderFragmenter::getFragmentInfo(const int fragment_id) const {
333  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
334  fragmentInfoVec_.end(),
335  [fragment_id](const auto& fragment) -> bool {
336  return fragment->fragmentId == fragment_id;
337  });
338  CHECK(fragment_it != fragmentInfoVec_.end());
339  return fragment_it->get();
340 }
341 
342 void InsertOrderFragmenter::insertData(InsertData& insertDataStruct) {
343  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
344  try {
345  mapd_unique_lock<mapd_shared_mutex> insertLock(
346  insertMutex_); // prevent two threads from trying to insert into the same table
347  // simultaneously
348 
349  insertDataImpl(insertDataStruct);
350 
351  if (defaultInsertLevel_ ==
352  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
353  dataMgr_->checkpoint(
354  chunkKeyPrefix_[0],
355  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
356  }
357  } catch (...) {
358  int32_t tableEpoch =
359  catalog_->getTableEpoch(insertDataStruct.databaseId, insertDataStruct.tableId);
360 
361  // the statement below deletes *this* object!
362  // relying on exception propagation at this stage
363  // until we can sort this out in a cleaner fashion
364  catalog_->setTableEpoch(
365  insertDataStruct.databaseId, insertDataStruct.tableId, tableEpoch);
366  throw;
367  }
368 }
369 
370 void InsertOrderFragmenter::insertDataNoCheckpoint(InsertData& insertDataStruct) {
371  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
372  mapd_unique_lock<mapd_shared_mutex> insertLock(
373  insertMutex_); // prevent two threads from trying to insert into the same table
374  // simultaneously
375  insertDataImpl(insertDataStruct);
376 }
377 
378 void InsertOrderFragmenter::replicateData(const InsertData& insertDataStruct) {
379  // synchronize concurrent accesses to fragmentInfoVec_
380  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
381  size_t numRowsLeft = insertDataStruct.numRows;
382  for (auto const& fragmentInfo : fragmentInfoVec_) {
383  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
384  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
385  size_t numRowsCanBeInserted;
386  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
387  if (insertDataStruct.bypass[i]) {
388  continue;
389  }
390  auto columnId = insertDataStruct.columnIds[i];
391  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
392  CHECK(colDesc);
393  CHECK(columnMap_.find(columnId) != columnMap_.end());
394 
395  ChunkKey chunkKey = chunkKeyPrefix_;
396  chunkKey.push_back(columnId);
397  chunkKey.push_back(fragmentInfo->fragmentId);
398 
399  auto colMapIt = columnMap_.find(columnId);
400  auto& chunk = colMapIt->second;
401  if (chunk.isChunkOnDevice(
402  dataMgr_,
403  chunkKey,
404  defaultInsertLevel_,
405  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
406  dataMgr_->deleteChunksWithPrefix(chunkKey);
407  }
408  chunk.createChunkBuffer(
409  dataMgr_,
410  chunkKey,
411  defaultInsertLevel_,
412  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
413  chunk.init_encoder();
414 
415  try {
416  DataBlockPtr dataCopy = insertDataStruct.data[i];
417  auto size = colDesc->columnType.get_size();
418  if (0 > size) {
419  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
420  varLenColInfo_[columnId] = 0;
421  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
422  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
423  } else {
424  numRowsCanBeInserted = maxChunkSize_ / size;
425  }
426 
427  // FIXME: abort a case in which new column is wider than existing columns
428  if (numRowsCanBeInserted < numRowsToInsert) {
429  throw std::runtime_error("new column '" + colDesc->columnName +
430  "' wider than existing columns is not supported");
431  }
432 
433  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
434  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
435 
436  // update total size of var-len column in (actually the last) fragment
437  if (0 > size) {
438  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
439  varLenColInfo_[columnId] = chunk.get_buffer()->size();
440  }
441  } catch (...) {
442  dataMgr_->deleteChunksWithPrefix(chunkKey);
443  throw;
444  }
445  }
446  numRowsLeft -= numRowsToInsert;
447  }
448  CHECK(0 == numRowsLeft);
449 
450  for (auto const& fragmentInfo : fragmentInfoVec_) {
451  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
452  }
453 }
454 
455 void InsertOrderFragmenter::dropColumns(const std::vector<int>& columnIds) {
456  // prevent concurrent insert rows and drop column
457  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
458  // synchronize concurrent accesses to fragmentInfoVec_
459  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
460  for (auto const& fragmentInfo : fragmentInfoVec_) {
461  fragmentInfo->shadowChunkMetadataMap = fragmentInfo->getChunkMetadataMapPhysical();
462  }
463 
464  for (const auto columnId : columnIds) {
465  auto cit = columnMap_.find(columnId);
466  if (columnMap_.end() != cit) {
467  columnMap_.erase(cit);
468  }
469 
470  vector<int> fragPrefix = chunkKeyPrefix_;
471  fragPrefix.push_back(columnId);
472  dataMgr_->deleteChunksWithPrefix(fragPrefix);
473 
474  for (const auto& fragmentInfo : fragmentInfoVec_) {
475  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
476  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
477  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
478  }
479  }
480  }
481  for (const auto& fragmentInfo : fragmentInfoVec_) {
482  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
483  }
484 }
485 
486 bool InsertOrderFragmenter::hasDeletedRows(const int delete_column_id) {
487  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
488 
489  for (auto const& fragment : fragmentInfoVec_) {
490  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
491  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
492  const auto& chunk_stats = chunk_meta_it->second.chunkStats;
493  if (chunk_stats.max.tinyintval == 1) {
494  return true;
495  }
496  }
497  return false;
498 }
499 
500 void InsertOrderFragmenter::insertDataImpl(InsertData& insertDataStruct) {
501  // populate deleted system column if it should exists, as it will not come from client
502  // Do not add this magical column in the replicate ALTER TABLE ADD route as
503  // it is not needed and will cause issues
504  std::unique_ptr<int8_t[]> data_for_deleted_column;
505  for (const auto& cit : columnMap_) {
506  if (cit.second.get_column_desc()->isDeletedCol &&
507  insertDataStruct.replicate_count == 0) {
508  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
509  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
510  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
511  insertDataStruct.columnIds.push_back(cit.second.get_column_desc()->columnId);
512  break;
513  }
514  }
515  // MAT we need to add a removal of the empty column we pushed onto here
516  // for upstream safety. Should not be a problem but need to fix.
517 
518  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
519  for (const auto columnId : insertDataStruct.columnIds) {
520  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
521  CHECK(columnDesc);
522  if (columnMap_.end() == columnMap_.find(columnId)) {
523  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
524  }
525  }
526 
527  // when replicate (add) column(s), this path seems wont work; go separate route...
528  if (insertDataStruct.replicate_count > 0) {
529  replicateData(insertDataStruct);
530  return;
531  }
532 
533  std::unordered_map<int, int> inverseInsertDataColIdMap;
534  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
535  inverseInsertDataColIdMap.insert(
536  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
537  }
538 
539  size_t numRowsLeft = insertDataStruct.numRows;
540  size_t numRowsInserted = 0;
541  vector<DataBlockPtr> dataCopy =
542  insertDataStruct.data; // bc append data will move ptr forward and this violates
543  // constness of InsertData
544  if (numRowsLeft <= 0) {
545  return;
546  }
547 
548  FragmentInfo* currentFragment{nullptr};
549 
550  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
551  currentFragment = createNewFragment(defaultInsertLevel_);
552  } else {
553  currentFragment = fragmentInfoVec_.back().get();
554  }
555  CHECK(currentFragment);
556 
557  size_t startFragment = fragmentInfoVec_.size() - 1;
558 
559  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
560  // loop until done inserting all rows
561  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
562  size_t rowsLeftInCurrentFragment =
563  maxFragmentRows_ - currentFragment->shadowNumTuples;
564  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
565  if (rowsLeftInCurrentFragment != 0) {
566  for (auto& varLenColInfoIt : varLenColInfo_) {
567  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
568  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
569  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
570  if (insertIdIt != inverseInsertDataColIdMap.end()) {
571  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
572  numRowsToInsert = std::min(
573  numRowsToInsert,
574  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
575  numRowsToInsert,
576  numRowsInserted,
577  bytesLeft));
578  }
579  }
580  }
581 
582  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
583  currentFragment = createNewFragment(defaultInsertLevel_);
584  if (numRowsInserted == 0) {
585  startFragment++;
586  }
587  rowsLeftInCurrentFragment = maxFragmentRows_;
588  for (auto& varLenColInfoIt : varLenColInfo_) {
589  varLenColInfoIt.second = 0; // reset byte counter
590  }
591  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
592  for (auto& varLenColInfoIt : varLenColInfo_) {
593  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
594  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
595  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
596  if (insertIdIt != inverseInsertDataColIdMap.end()) {
597  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
598  numRowsToInsert = std::min(
599  numRowsToInsert,
600  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
601  numRowsToInsert,
602  numRowsInserted,
603  bytesLeft));
604  }
605  }
606  }
607 
608  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
609  // never be able to insert anything
610 
611  // for each column, append the data in the appropriate insert buffer
612  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
613  int columnId = insertDataStruct.columnIds[i];
614  auto colMapIt = columnMap_.find(columnId);
615  CHECK(colMapIt != columnMap_.end());
616  currentFragment->shadowChunkMetadataMap[columnId] =
617  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
618  auto varLenColInfoIt = varLenColInfo_.find(columnId);
619  if (varLenColInfoIt != varLenColInfo_.end()) {
620  varLenColInfoIt->second = colMapIt->second.get_buffer()->size();
621  }
622  }
623  if (hasMaterializedRowId_) {
624  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
625  currentFragment->shadowNumTuples;
626  int64_t* rowIdData = new int64_t[numRowsToInsert];
627  for (size_t i = 0; i < numRowsToInsert; ++i) {
628  rowIdData[i] = i + startId;
629  }
630  DataBlockPtr rowIdBlock;
631  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(rowIdData);
632  auto colMapIt = columnMap_.find(rowIdColId_);
633  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
634  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
635  delete[] rowIdData;
636  }
637 
638  currentFragment->shadowNumTuples =
639  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
640  numRowsLeft -= numRowsToInsert;
641  numRowsInserted += numRowsToInsert;
642  }
643  {
644  // Only take the fragment info lock when updating fragment info map. Otherwise,
645  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
646  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
647  // COPY_FROM waits for TableWriteLock
648  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
649  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
650  partIt != fragmentInfoVec_.end();
651  ++partIt) {
652  auto fragment_ptr = partIt->get();
653  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
654  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
655  }
656  }
657  numTuples_ += insertDataStruct.numRows;
658  dropFragmentsToSize(maxRows_);
659 }
660 
661 FragmentInfo* InsertOrderFragmenter::createNewFragment(
662  const Data_Namespace::MemoryLevel memoryLevel) {
663  // also sets the new fragment as the insertBuffer for each column
664 
665  maxFragmentId_++;
666  auto newFragmentInfo = std::make_unique<FragmentInfo>();
667  newFragmentInfo->fragmentId = maxFragmentId_;
668  newFragmentInfo->shadowNumTuples = 0;
669  newFragmentInfo->setPhysicalNumTuples(0);
670  for (const auto levelSize : dataMgr_->levelSizes_) {
671  newFragmentInfo->deviceIds.push_back(newFragmentInfo->fragmentId % levelSize);
672  }
673  newFragmentInfo->physicalTableId = physicalTableId_;
674  newFragmentInfo->shard = shard_;
675 
676  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
677  colMapIt != columnMap_.end();
678  ++colMapIt) {
679  // colMapIt->second.unpin_buffer();
680  ChunkKey chunkKey = chunkKeyPrefix_;
681  chunkKey.push_back(colMapIt->second.get_column_desc()->columnId);
682  chunkKey.push_back(maxFragmentId_);
683  colMapIt->second.createChunkBuffer(
684  dataMgr_,
685  chunkKey,
686  memoryLevel,
687  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
688  pageSize_);
689  colMapIt->second.init_encoder();
690  }
691 
692  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
693  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
694  return fragmentInfoVec_.back().get();
695 }
696 
697 TableInfo InsertOrderFragmenter::getFragmentsForQuery() {
698  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
699  TableInfo queryInfo;
700  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
701  // right now we don't test predicate, so just return (copy of) all fragments
702  bool fragmentsExist = false;
703  if (fragmentInfoVec_.empty()) {
704  // If we have no fragments add a dummy empty fragment to make the executor
705  // not have separate logic for 0-row tables
706  int maxFragmentId = 0;
707  FragmentInfo emptyFragmentInfo;
708  emptyFragmentInfo.fragmentId = maxFragmentId;
709  emptyFragmentInfo.shadowNumTuples = 0;
710  emptyFragmentInfo.setPhysicalNumTuples(0);
711  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
712  emptyFragmentInfo.physicalTableId = physicalTableId_;
713  emptyFragmentInfo.shard = shard_;
714  queryInfo.fragments.push_back(emptyFragmentInfo);
715  } else {
716  fragmentsExist = true;
717  std::for_each(
718  fragmentInfoVec_.begin(),
719  fragmentInfoVec_.end(),
720  [&queryInfo](const auto& fragment_owned_ptr) {
721  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
722  });
723  }
724  readLock.unlock();
725  queryInfo.setPhysicalNumTuples(0);
726  auto partIt = queryInfo.fragments.begin();
727  if (fragmentsExist) {
728  while (partIt != queryInfo.fragments.end()) {
729  if (partIt->getPhysicalNumTuples() == 0) {
730  // this means that a concurrent insert query inserted tuples into a new fragment
731  // but when the query came in we didn't have this fragment. To make sure we don't
732  // mess up the executor we delete this fragment from the metadatamap (fixes
733  // earlier bug found 2015-05-08)
734  partIt = queryInfo.fragments.erase(partIt);
735  } else {
736  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
737  partIt->getPhysicalNumTuples());
738  ++partIt;
739  }
740  }
741  } else {
742  // We added a dummy fragment and know the table is empty
743  queryInfo.setPhysicalNumTuples(0);
744  }
745  return queryInfo;
746 }
747 
748 } // namespace Fragmenter_Namespace
catalog_(nullptr)
std::vector< int > ChunkKey
Definition: types.h:35
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:224
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
#define LOG(tag)
Definition: Logger.h:188
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:88
#define CHECK_GE(x, y)
Definition: Logger.h:210
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:796
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:166
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:165
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string to_string(char const *&&v)
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:159
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CHECK(cgen_state)
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:161
#define DROP_FRAGMENT_FACTOR
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:111
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:67
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool is_dict_encoded_string() const
Definition: sqltypes.h:425
SQLTypeInfo columnType
int8_t * numbersPtr
Definition: sqltypes.h:138
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
#define VLOG(n)
Definition: Logger.h:291