OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InsertOrderFragmenter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/lexical_cast.hpp>
21 #include <cassert>
22 #include <cmath>
23 #include <iostream>
24 #include <limits>
25 #include <memory>
26 #include <thread>
27 #include <type_traits>
28 
29 #include "DataMgr/AbstractBuffer.h"
30 #include "DataMgr/DataMgr.h"
32 #include "LockMgr/LockMgr.h"
33 #include "Logger/Logger.h"
34 
35 #include "Shared/checked_alloc.h"
36 #include "Shared/thread_count.h"
37 
38 #define DROP_FRAGMENT_FACTOR \
39  0.97 // drop to 97% of max so we don't keep adding and dropping fragments
40 
41 using Chunk_NS::Chunk;
44 
46 
47 using namespace std;
48 
49 namespace Fragmenter_Namespace {
50 
51 InsertOrderFragmenter::InsertOrderFragmenter(
52  const vector<int> chunkKeyPrefix,
53  vector<Chunk>& chunkVec,
54  Data_Namespace::DataMgr* dataMgr,
56  const int physicalTableId,
57  const int shard,
58  const size_t maxFragmentRows,
59  const size_t maxChunkSize,
60  const size_t pageSize,
61  const size_t maxRows,
62  const Data_Namespace::MemoryLevel defaultInsertLevel,
63  const bool uses_foreign_storage)
64  : chunkKeyPrefix_(chunkKeyPrefix)
65  , dataMgr_(dataMgr)
66  , catalog_(catalog)
67  , physicalTableId_(physicalTableId)
68  , shard_(shard)
69  , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
70  , pageSize_(pageSize)
71  , numTuples_(0)
72  , maxFragmentId_(-1)
73  , maxChunkSize_(maxChunkSize)
74  , maxRows_(maxRows)
75  , fragmenterType_("insert_order")
76  , defaultInsertLevel_(defaultInsertLevel)
77  , uses_foreign_storage_(uses_foreign_storage)
78  , hasMaterializedRowId_(false)
79  , mutex_access_inmem_states(new std::mutex) {
80  // Note that Fragmenter is not passed virtual columns and so should only
81  // find row id column if it is non virtual
82 
83  for (auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
84  int columnId = colIt->getColumnDesc()->columnId;
85  columnMap_[columnId] = *colIt;
86  if (colIt->getColumnDesc()->columnName == "rowid") {
87  hasMaterializedRowId_ = true;
88  rowIdColId_ = columnId;
89  }
90  }
91  conditionallyInstantiateFileMgrWithParams();
92  getChunkMetadata();
93 }
94 
95 InsertOrderFragmenter::~InsertOrderFragmenter() {}
96 
97 namespace {
98 
103 int compute_device_for_fragment(const int table_id,
104  const int fragment_id,
105  const int num_devices) {
107  return (table_id + fragment_id) % num_devices;
108  } else {
109  return fragment_id % num_devices;
110  }
111 }
112 
113 } // namespace
114 
115 void InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams() {
116  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
117  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
118  // storage per table
119  if (!uses_foreign_storage_ &&
120  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
121  const TableDescriptor* td =
122  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
123  File_Namespace::FileMgrParams fileMgrParams;
124  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
125  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
126  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
127  }
128 }
129 
130 void InsertOrderFragmenter::getChunkMetadata() {
131  if (uses_foreign_storage_ ||
132  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
133  // memory-resident tables won't have anything on disk
134  ChunkMetadataVector chunk_metadata;
135  dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
136 
137  // data comes like this - database_id, table_id, column_id, fragment_id
138  // but lets sort by database_id, table_id, fragment_id, column_id
139 
140  int fragment_subkey_index = 3;
141  std::sort(chunk_metadata.begin(),
142  chunk_metadata.end(),
143  [&](const auto& pair1, const auto& pair2) {
144  return pair1.first[3] < pair2.first[3];
145  });
146 
147  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
148  ++chunk_itr) {
149  int cur_column_id = chunk_itr->first[2];
150  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
151 
152  if (fragmentInfoVec_.empty() ||
153  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
154  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
155  CHECK(new_fragment_info);
156  maxFragmentId_ = cur_fragment_id;
157  new_fragment_info->fragmentId = cur_fragment_id;
158  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
159  numTuples_ += new_fragment_info->getPhysicalNumTuples();
160  for (const auto level_size : dataMgr_->levelSizes_) {
161  new_fragment_info->deviceIds.push_back(
162  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
163  }
164  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
165  new_fragment_info->physicalTableId = physicalTableId_;
166  new_fragment_info->shard = shard_;
167  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
168  } else {
169  if (chunk_itr->second->numElements !=
170  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
171  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
172  std::to_string(physicalTableId_) + ", Column " +
173  std::to_string(cur_column_id) + ". Fragment Tuples: " +
175  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
176  ", Chunk Tuples: " +
177  std::to_string(chunk_itr->second->numElements);
178  }
179  }
180  CHECK(fragmentInfoVec_.back().get());
181  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
182  }
183  }
184 
185  size_t maxFixedColSize = 0;
186 
187  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
188  auto size = colIt->second.getColumnDesc()->columnType.get_size();
189  if (size == -1) { // variable length
190  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
191  size = 8; // b/c we use this for string and array indices - gross to have magic
192  // number here
193  }
194  CHECK_GE(size, 0);
195  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
196  }
197 
198  // this is maximum number of rows assuming everything is fixed length
199  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
200  setLastFragmentVarLenColumnSizes();
201 }
202 
203 void InsertOrderFragmenter::dropFragmentsToSize(const size_t max_rows) {
204  mapd_unique_lock<mapd_shared_mutex> insert_lock(insertMutex_);
205  dropFragmentsToSizeNoInsertLock(max_rows);
206 }
207 
208 void InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock(const size_t max_rows) {
209  // not safe to call from outside insertData
210  // b/c depends on insertLock around numTuples_
211 
212  // don't ever drop the only fragment!
213  if (numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
214  return;
215  }
216 
217  if (numTuples_ > max_rows) {
218  size_t preNumTuples = numTuples_;
219  vector<int> dropFragIds;
220  size_t targetRows = max_rows * DROP_FRAGMENT_FACTOR;
221  while (numTuples_ > targetRows) {
222  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
223  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
224  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
225  fragmentInfoVec_.pop_front();
226  CHECK_GE(numTuples_, numFragTuples);
227  numTuples_ -= numFragTuples;
228  }
229  deleteFragments(dropFragIds);
230  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
231  << " post: " << numTuples_ << " maxRows: " << max_rows;
232  }
233 }
234 
235 void InsertOrderFragmenter::deleteFragments(const vector<int>& dropFragIds) {
236  // Fix a verified loophole on sharded logical table which is locked using logical
237  // tableId while it's its physical tables that can come here when fragments overflow
238  // during COPY. Locks on a logical table and its physical tables never intersect, which
239  // means potential races. It'll be an overkill to resolve a logical table to physical
240  // tables in DBHandler, ParseNode or other higher layers where the logical table is
241  // locked with Table Read/Write locks; it's easier to lock the logical table of its
242  // physical tables. A downside of this approach may be loss of parallel execution of
243  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
244  // operation, the loss seems not a big deal.
245  auto chunkKeyPrefix = chunkKeyPrefix_;
246  if (shard_ >= 0) {
247  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
248  }
249 
250  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
251  // SELECT and COPY may enter a deadlock
252  const auto delete_lock =
254 
255  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
256 
257  for (const auto fragId : dropFragIds) {
258  for (const auto& col : columnMap_) {
259  int colId = col.first;
260  vector<int> fragPrefix = chunkKeyPrefix_;
261  fragPrefix.push_back(colId);
262  fragPrefix.push_back(fragId);
263  dataMgr_->deleteChunksWithPrefix(fragPrefix);
264  }
265  }
266 }
267 
268 void InsertOrderFragmenter::updateColumnChunkMetadata(
269  const ColumnDescriptor* cd,
270  const int fragment_id,
271  const std::shared_ptr<ChunkMetadata> metadata) {
272  // synchronize concurrent accesses to fragmentInfoVec_
273  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
274 
275  CHECK(metadata.get());
276  auto fragment_info = getFragmentInfo(fragment_id);
277  CHECK(fragment_info);
278  fragment_info->setChunkMetadata(cd->columnId, metadata);
279 }
280 
281 void InsertOrderFragmenter::updateChunkStats(
282  const ColumnDescriptor* cd,
283  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map,
284  std::optional<Data_Namespace::MemoryLevel> memory_level) {
285  // synchronize concurrent accesses to fragmentInfoVec_
286  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
292  if (shard_ >= 0) {
293  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
294  }
295 
296  CHECK(cd);
297  const auto column_id = cd->columnId;
298  const auto col_itr = columnMap_.find(column_id);
299  CHECK(col_itr != columnMap_.end());
300 
301  for (auto const& fragment : fragmentInfoVec_) {
302  auto stats_itr = stats_map.find(fragment->fragmentId);
303  if (stats_itr != stats_map.end()) {
304  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
305  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
306  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
307  physicalTableId_,
308  column_id,
309  fragment->fragmentId};
310  auto chunk = Chunk_NS::Chunk::getChunk(cd,
311  &catalog_->getDataMgr(),
312  chunk_key,
313  memory_level.value_or(defaultInsertLevel_),
314  0,
315  chunk_meta_it->second->numBytes,
316  chunk_meta_it->second->numElements);
317  auto buf = chunk->getBuffer();
318  CHECK(buf);
319  if (!buf->hasEncoder()) {
320  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
321  }
322  auto encoder = buf->getEncoder();
323 
324  auto chunk_stats = stats_itr->second;
325 
326  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
327  encoder->getMetadata(old_chunk_metadata);
328  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
329 
330  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
331  // Use the logical type to display data, since the encoding should be ignored
332  const auto logical_ti = cd->columnType.is_dict_encoded_string()
335  if (!didResetStats) {
336  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
337  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
338  << DatumToString(chunk_stats.max, logical_ti);
339  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
340  << DatumToString(chunk_stats.min, logical_ti);
341  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
342  continue; // move to next fragment
343  }
344 
345  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
346  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
347  << DatumToString(chunk_stats.max, logical_ti);
348  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
349  << DatumToString(chunk_stats.min, logical_ti);
350  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
351 
352  // Reset fragment metadata map and set buffer to dirty
353  auto new_metadata = std::make_shared<ChunkMetadata>();
354  // Run through fillChunkStats to ensure any transformations to the raw metadata
355  // values get applied (e.g. for date in days)
356  encoder->getMetadata(new_metadata);
357 
358  fragment->setChunkMetadata(column_id, new_metadata);
359  fragment->shadowChunkMetadataMap =
360  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
361  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
362  buf->setDirty();
363  }
364  } else {
365  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
366  << ", table " << physicalTableId_ << ", "
367  << ", column " << column_id;
368  }
369  }
370 }
371 
372 FragmentInfo* InsertOrderFragmenter::getFragmentInfo(const int fragment_id) const {
373  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
374  fragmentInfoVec_.end(),
375  [fragment_id](const auto& fragment) -> bool {
376  return fragment->fragmentId == fragment_id;
377  });
378  CHECK(fragment_it != fragmentInfoVec_.end());
379  return fragment_it->get();
380 }
381 
382 bool InsertOrderFragmenter::isAddingNewColumns(const InsertData& insert_data) const {
383  bool all_columns_already_exist = true, all_columns_are_new = true;
384  for (const auto column_id : insert_data.columnIds) {
385  if (columnMap_.find(column_id) == columnMap_.end()) {
386  all_columns_already_exist = false;
387  } else {
388  all_columns_are_new = false;
389  }
390  }
391  // only one should be TRUE
392  bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
393  CHECK(either_all_exist_or_all_new);
394  return all_columns_are_new;
395 }
396 
397 void InsertOrderFragmenter::insertData(InsertData& insert_data_struct) {
398  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
399  try {
400  // prevent two threads from trying to insert into the same table simultaneously
401  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
402  if (!isAddingNewColumns(insert_data_struct)) {
403  insertDataImpl(insert_data_struct);
404  } else {
405  addColumns(insert_data_struct);
406  }
407  if (defaultInsertLevel_ ==
408  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
409  dataMgr_->checkpoint(
410  chunkKeyPrefix_[0],
411  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
412  }
413  } catch (...) {
414  auto table_epochs = catalog_->getTableEpochs(insert_data_struct.databaseId,
415  insert_data_struct.tableId);
416  // the statement below deletes *this* object!
417  // relying on exception propagation at this stage
418  // until we can sort this out in a cleaner fashion
419  catalog_->setTableEpochs(insert_data_struct.databaseId, table_epochs);
420  throw;
421  }
422 }
423 
424 void InsertOrderFragmenter::insertDataNoCheckpoint(InsertData& insert_data_struct) {
425  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
426  mapd_unique_lock<mapd_shared_mutex> insertLock(
427  insertMutex_); // prevent two threads from trying to insert into the same table
428  // simultaneously
429  if (!isAddingNewColumns(insert_data_struct)) {
430  insertDataImpl(insert_data_struct);
431  } else {
432  addColumns(insert_data_struct);
433  }
434 }
435 
436 void InsertOrderFragmenter::addColumns(const InsertData& insertDataStruct) {
437  // synchronize concurrent accesses to fragmentInfoVec_
438  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
439  size_t numRowsLeft = insertDataStruct.numRows;
440  for (const auto columnId : insertDataStruct.columnIds) {
441  CHECK(columnMap_.end() == columnMap_.find(columnId));
442  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
443  CHECK(columnDesc);
444  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
445  }
446  try {
447  for (auto const& fragmentInfo : fragmentInfoVec_) {
448  fragmentInfo->shadowChunkMetadataMap =
449  fragmentInfo->getChunkMetadataMapPhysicalCopy();
450  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
451  size_t numRowsCanBeInserted;
452  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
453  auto columnId = insertDataStruct.columnIds[i];
454  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
455  CHECK(colDesc);
456  CHECK(columnMap_.find(columnId) != columnMap_.end());
457 
458  ChunkKey chunkKey = chunkKeyPrefix_;
459  chunkKey.push_back(columnId);
460  chunkKey.push_back(fragmentInfo->fragmentId);
461 
462  auto colMapIt = columnMap_.find(columnId);
463  auto& chunk = colMapIt->second;
464  if (chunk.isChunkOnDevice(
465  dataMgr_,
466  chunkKey,
467  defaultInsertLevel_,
468  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
469  dataMgr_->deleteChunksWithPrefix(chunkKey);
470  }
471  chunk.createChunkBuffer(
472  dataMgr_,
473  chunkKey,
474  defaultInsertLevel_,
475  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
476  chunk.initEncoder();
477 
478  try {
479  DataBlockPtr dataCopy = insertDataStruct.data[i];
480  auto size = colDesc->columnType.get_size();
481  if (0 > size) {
482  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
483  varLenColInfo_[columnId] = 0;
484  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
485  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
486  } else {
487  numRowsCanBeInserted = maxChunkSize_ / size;
488  }
489 
490  // FIXME: abort a case in which new column is wider than existing columns
491  if (numRowsCanBeInserted < numRowsToInsert) {
492  throw std::runtime_error("new column '" + colDesc->columnName +
493  "' wider than existing columns is not supported");
494  }
495 
496  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
497  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
498 
499  // update total size of var-len column in (actually the last) fragment
500  if (0 > size) {
501  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
502  varLenColInfo_[columnId] = chunk.getBuffer()->size();
503  }
504  } catch (...) {
505  dataMgr_->deleteChunksWithPrefix(chunkKey);
506  throw;
507  }
508  }
509  numRowsLeft -= numRowsToInsert;
510  }
511  CHECK(0 == numRowsLeft);
512  } catch (const std::exception& e) {
513  for (const auto columnId : insertDataStruct.columnIds) {
514  columnMap_.erase(columnId);
515  }
516  throw e;
517  }
518 
519  for (auto const& fragmentInfo : fragmentInfoVec_) {
520  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
521  }
522 }
523 
524 void InsertOrderFragmenter::dropColumns(const std::vector<int>& columnIds) {
525  // prevent concurrent insert rows and drop column
526  mapd_unique_lock<mapd_shared_mutex> insertLock(insertMutex_);
527  // synchronize concurrent accesses to fragmentInfoVec_
528  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
529  for (auto const& fragmentInfo : fragmentInfoVec_) {
530  fragmentInfo->shadowChunkMetadataMap =
531  fragmentInfo->getChunkMetadataMapPhysicalCopy();
532  }
533 
534  for (const auto columnId : columnIds) {
535  auto cit = columnMap_.find(columnId);
536  if (columnMap_.end() != cit) {
537  columnMap_.erase(cit);
538  }
539 
540  vector<int> fragPrefix = chunkKeyPrefix_;
541  fragPrefix.push_back(columnId);
542  dataMgr_->deleteChunksWithPrefix(fragPrefix);
543 
544  for (const auto& fragmentInfo : fragmentInfoVec_) {
545  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
546  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
547  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
548  }
549  }
550  }
551  for (const auto& fragmentInfo : fragmentInfoVec_) {
552  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
553  }
554 }
555 
556 bool InsertOrderFragmenter::hasDeletedRows(const int delete_column_id) {
557  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
558 
559  for (auto const& fragment : fragmentInfoVec_) {
560  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
561  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
562  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
563  if (chunk_stats.max.tinyintval == 1) {
564  return true;
565  }
566  }
567  return false;
568 }
569 
570 void InsertOrderFragmenter::insertDataImpl(InsertData& insert_data) {
571  // populate deleted system column if it should exist, as it will not come from client
572  std::unique_ptr<int8_t[]> data_for_deleted_column;
573  for (const auto& cit : columnMap_) {
574  if (cit.second.getColumnDesc()->isDeletedCol) {
575  data_for_deleted_column.reset(new int8_t[insert_data.numRows]);
576  memset(data_for_deleted_column.get(), 0, insert_data.numRows);
577  insert_data.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
578  insert_data.columnIds.push_back(cit.second.getColumnDesc()->columnId);
579  insert_data.is_default.push_back(false);
580  break;
581  }
582  }
583  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
584  std::unordered_map<int, int> inverseInsertDataColIdMap;
585  for (size_t insertId = 0; insertId < insert_data.columnIds.size(); ++insertId) {
586  inverseInsertDataColIdMap.insert(
587  std::make_pair(insert_data.columnIds[insertId], insertId));
588  }
589 
590  size_t numRowsLeft = insert_data.numRows;
591  size_t numRowsInserted = 0;
592  vector<DataBlockPtr> dataCopy =
593  insert_data.data; // bc append data will move ptr forward and this violates
594  // constness of InsertData
595  if (numRowsLeft <= 0) {
596  return;
597  }
598 
599  FragmentInfo* currentFragment{nullptr};
600 
601  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
602  // feels fragile
603  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
604  currentFragment = createNewFragment(defaultInsertLevel_);
605  } else {
606  currentFragment = fragmentInfoVec_.back().get();
607  }
608  CHECK(currentFragment);
609 
610  size_t startFragment = fragmentInfoVec_.size() - 1;
611 
612  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
613  // loop until done inserting all rows
614  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
615  size_t rowsLeftInCurrentFragment =
616  maxFragmentRows_ - currentFragment->shadowNumTuples;
617  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
618  if (rowsLeftInCurrentFragment != 0) {
619  for (auto& varLenColInfoIt : varLenColInfo_) {
620  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
621  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
622  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
623  if (insertIdIt != inverseInsertDataColIdMap.end()) {
624  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
625  numRowsToInsert = std::min(numRowsToInsert,
626  colMapIt->second.getNumElemsForBytesInsertData(
627  dataCopy[insertIdIt->second],
628  numRowsToInsert,
629  numRowsInserted,
630  bytesLeft,
631  insert_data.is_default[insertIdIt->second]));
632  }
633  }
634  }
635 
636  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
637  currentFragment = createNewFragment(defaultInsertLevel_);
638  if (numRowsInserted == 0) {
639  startFragment++;
640  }
641  rowsLeftInCurrentFragment = maxFragmentRows_;
642  for (auto& varLenColInfoIt : varLenColInfo_) {
643  varLenColInfoIt.second = 0; // reset byte counter
644  }
645  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
646  for (auto& varLenColInfoIt : varLenColInfo_) {
647  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
648  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
649  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
650  if (insertIdIt != inverseInsertDataColIdMap.end()) {
651  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
652  numRowsToInsert = std::min(numRowsToInsert,
653  colMapIt->second.getNumElemsForBytesInsertData(
654  dataCopy[insertIdIt->second],
655  numRowsToInsert,
656  numRowsInserted,
657  bytesLeft,
658  insert_data.is_default[insertIdIt->second]));
659  }
660  }
661  }
662 
663  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
664  // never be able to insert anything
665 
666  {
667  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
668  // for each column, append the data in the appropriate insert buffer
669  for (size_t i = 0; i < insert_data.columnIds.size(); ++i) {
670  int columnId = insert_data.columnIds[i];
671  auto colMapIt = columnMap_.find(columnId);
672  CHECK(colMapIt != columnMap_.end());
673  currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
674  dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.is_default[i]);
675  auto varLenColInfoIt = varLenColInfo_.find(columnId);
676  if (varLenColInfoIt != varLenColInfo_.end()) {
677  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
678  }
679  }
680  if (hasMaterializedRowId_) {
681  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
682  currentFragment->shadowNumTuples;
683  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
684  for (size_t i = 0; i < numRowsToInsert; ++i) {
685  row_id_data[i] = i + startId;
686  }
687  DataBlockPtr rowIdBlock;
688  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
689  auto colMapIt = columnMap_.find(rowIdColId_);
690  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
691  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
692  }
693 
694  currentFragment->shadowNumTuples =
695  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
696  numRowsLeft -= numRowsToInsert;
697  numRowsInserted += numRowsToInsert;
698  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
699  partIt != fragmentInfoVec_.end();
700  ++partIt) {
701  auto fragment_ptr = partIt->get();
702  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
703  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
704  }
705  }
706  }
707  numTuples_ += insert_data.numRows;
708  dropFragmentsToSizeNoInsertLock(maxRows_);
709 }
710 
711 FragmentInfo* InsertOrderFragmenter::createNewFragment(
712  const Data_Namespace::MemoryLevel memoryLevel) {
713  // also sets the new fragment as the insertBuffer for each column
714 
715  maxFragmentId_++;
716  auto newFragmentInfo = std::make_unique<FragmentInfo>();
717  newFragmentInfo->fragmentId = maxFragmentId_;
718  newFragmentInfo->shadowNumTuples = 0;
719  newFragmentInfo->setPhysicalNumTuples(0);
720  for (const auto levelSize : dataMgr_->levelSizes_) {
721  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
722  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
723  }
724  newFragmentInfo->physicalTableId = physicalTableId_;
725  newFragmentInfo->shard = shard_;
726 
727  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
728  colMapIt != columnMap_.end();
729  ++colMapIt) {
730  ChunkKey chunkKey = chunkKeyPrefix_;
731  chunkKey.push_back(colMapIt->second.getColumnDesc()->columnId);
732  chunkKey.push_back(maxFragmentId_);
733  colMapIt->second.createChunkBuffer(
734  dataMgr_,
735  chunkKey,
736  memoryLevel,
737  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
738  pageSize_);
739  colMapIt->second.initEncoder();
740  }
741 
742  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
743  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
744  return fragmentInfoVec_.back().get();
745 }
746 
747 size_t InsertOrderFragmenter::getNumFragments() {
748  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
749  return fragmentInfoVec_.size();
750 }
751 
752 TableInfo InsertOrderFragmenter::getFragmentsForQuery() {
753  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
754  TableInfo queryInfo;
755  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
756  // right now we don't test predicate, so just return (copy of) all fragments
757  bool fragmentsExist = false;
758  if (fragmentInfoVec_.empty()) {
759  // If we have no fragments add a dummy empty fragment to make the executor
760  // not have separate logic for 0-row tables
761  int maxFragmentId = 0;
762  FragmentInfo emptyFragmentInfo;
763  emptyFragmentInfo.fragmentId = maxFragmentId;
764  emptyFragmentInfo.shadowNumTuples = 0;
765  emptyFragmentInfo.setPhysicalNumTuples(0);
766  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
767  emptyFragmentInfo.physicalTableId = physicalTableId_;
768  emptyFragmentInfo.shard = shard_;
769  queryInfo.fragments.push_back(emptyFragmentInfo);
770  } else {
771  fragmentsExist = true;
772  std::for_each(
773  fragmentInfoVec_.begin(),
774  fragmentInfoVec_.end(),
775  [&queryInfo](const auto& fragment_owned_ptr) {
776  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
777  });
778  }
779  readLock.unlock();
780  queryInfo.setPhysicalNumTuples(0);
781  auto partIt = queryInfo.fragments.begin();
782  if (fragmentsExist) {
783  while (partIt != queryInfo.fragments.end()) {
784  if (partIt->getPhysicalNumTuples() == 0) {
785  // this means that a concurrent insert query inserted tuples into a new fragment
786  // but when the query came in we didn't have this fragment. To make sure we
787  // don't mess up the executor we delete this fragment from the metadatamap
788  // (fixes earlier bug found 2015-05-08)
789  partIt = queryInfo.fragments.erase(partIt);
790  } else {
791  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
792  partIt->getPhysicalNumTuples());
793  ++partIt;
794  }
795  }
796  } else {
797  // We added a dummy fragment and know the table is empty
798  queryInfo.setPhysicalNumTuples(0);
799  }
800  return queryInfo;
801 }
802 
803 void InsertOrderFragmenter::resetSizesFromFragments() {
804  mapd_shared_lock<mapd_shared_mutex> read_lock(fragmentInfoMutex_);
805  numTuples_ = 0;
806  for (const auto& fragment_info : fragmentInfoVec_) {
807  numTuples_ += fragment_info->getPhysicalNumTuples();
808  }
809  setLastFragmentVarLenColumnSizes();
810 }
811 
812 void InsertOrderFragmenter::setLastFragmentVarLenColumnSizes() {
813  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
814  // Now need to get the insert buffers for each column - should be last
815  // fragment
816  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
817  // TODO: add accessor here for safe indexing
818  int deviceId =
819  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
820  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
821  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
822  insertKey.push_back(colIt->first); // column id
823  insertKey.push_back(lastFragmentId); // fragment id
824  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
825  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
826  if (varLenColInfoIt != varLenColInfo_.end()) {
827  varLenColInfoIt->second = colIt->second.getBuffer()->size();
828  }
829  }
830  }
831 }
832 } // namespace Fragmenter_Namespace
int32_t maxRollbackEpochs
catalog_(nullptr)
bool g_use_table_device_offset
std::vector< int > ChunkKey
Definition: types.h:37
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:392
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
#define LOG(tag)
Definition: Logger.h:203
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:222
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1049
std::vector< bool > is_default
Definition: Fragmenter.h:66
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:162
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:161
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::string to_string(char const *&&v)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:86
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:155
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
#define CHECK_LE(x, y)
Definition: Logger.h:220
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
mapd_shared_lock< mapd_shared_mutex > read_lock
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:209
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:157
#define DROP_FRAGMENT_FACTOR
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:109
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:68
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
bool is_dict_encoded_string() const
Definition: sqltypes.h:546
SQLTypeInfo columnType
int8_t * numbersPtr
Definition: sqltypes.h:226
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
#define VLOG(n)
Definition: Logger.h:303
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)