OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
InsertOrderFragmenter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "InsertOrderFragmenter.h"
18 #include <boost/lexical_cast.hpp>
19 #include <cassert>
20 #include <cmath>
21 #include <iostream>
22 #include <limits>
23 #include <thread>
24 #include <type_traits>
25 #include "../DataMgr/AbstractBuffer.h"
26 #include "../DataMgr/DataMgr.h"
27 #include "../LockMgr/LockMgr.h"
28 #include "../Shared/checked_alloc.h"
29 #include "../Shared/thread_count.h"
30 #include "Shared/Logger.h"
31 
32 #include <LockMgr/TableLockMgr.h>
33 
34 #define DROP_FRAGMENT_FACTOR \
35  0.97 // drop to 97% of max so we don't keep adding and dropping fragments
36 
37 using Chunk_NS::Chunk;
40 
41 using namespace std;
42 
43 namespace Fragmenter_Namespace {
44 
45 InsertOrderFragmenter::InsertOrderFragmenter(
46  const vector<int> chunkKeyPrefix,
47  vector<Chunk>& chunkVec,
48  Data_Namespace::DataMgr* dataMgr,
50  const int physicalTableId,
51  const int shard,
52  const size_t maxFragmentRows,
53  const size_t maxChunkSize,
54  const size_t pageSize,
55  const size_t maxRows,
56  const Data_Namespace::MemoryLevel defaultInsertLevel)
57  : chunkKeyPrefix_(chunkKeyPrefix)
58  , dataMgr_(dataMgr)
59  , catalog_(catalog)
60  , physicalTableId_(physicalTableId)
61  , shard_(shard)
62  , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
63  , pageSize_(pageSize)
64  , numTuples_(0)
65  , maxFragmentId_(-1)
66  , maxChunkSize_(maxChunkSize)
67  , maxRows_(maxRows)
68  , fragmenterType_("insert_order")
69  , defaultInsertLevel_(defaultInsertLevel)
70  , hasMaterializedRowId_(false)
71  , mutex_access_inmem_states(new std::mutex) {
72  // Note that Fragmenter is not passed virtual columns and so should only
73  // find row id column if it is non virtual
74 
75  for (auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
76  int columnId = colIt->get_column_desc()->columnId;
77  columnMap_[columnId] = *colIt;
78  if (colIt->get_column_desc()->columnName == "rowid") {
79  hasMaterializedRowId_ = true;
80  rowIdColId_ = columnId;
81  }
82  }
83  getChunkMetadata();
84 }
85 
86 InsertOrderFragmenter::~InsertOrderFragmenter() {}
87 
88 void InsertOrderFragmenter::getChunkMetadata() {
89  if (defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
90  // memory-resident tables won't have anything on disk
91  std::vector<std::pair<ChunkKey, ChunkMetadata>> chunk_metadata;
92  dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
93 
94  // data comes like this - database_id, table_id, column_id, fragment_id
95  // but lets sort by database_id, table_id, fragment_id, column_id
96 
97  int fragment_subkey_index = 3;
98  std::sort(chunk_metadata.begin(),
99  chunk_metadata.end(),
100  [&](const std::pair<ChunkKey, ChunkMetadata>& pair1,
101  const std::pair<ChunkKey, ChunkMetadata>& pair2) {
102  return pair1.first[3] < pair2.first[3];
103  });
104 
105  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
106  ++chunk_itr) {
107  int cur_column_id = chunk_itr->first[2];
108  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
109 
110  if (fragmentInfoVec_.empty() ||
111  cur_fragment_id != fragmentInfoVec_.back().fragmentId) {
112  maxFragmentId_ = cur_fragment_id;
113  fragmentInfoVec_.emplace_back();
114  fragmentInfoVec_.back().fragmentId = cur_fragment_id;
115  fragmentInfoVec_.back().setPhysicalNumTuples(chunk_itr->second.numElements);
116  numTuples_ += fragmentInfoVec_.back().getPhysicalNumTuples();
117  for (const auto level_size : dataMgr_->levelSizes_) {
118  fragmentInfoVec_.back().deviceIds.push_back(cur_fragment_id % level_size);
119  }
120  fragmentInfoVec_.back().shadowNumTuples =
121  fragmentInfoVec_.back().getPhysicalNumTuples();
122  fragmentInfoVec_.back().physicalTableId = physicalTableId_;
123  fragmentInfoVec_.back().shard = shard_;
124  } else {
125  if (chunk_itr->second.numElements !=
126  fragmentInfoVec_.back().getPhysicalNumTuples()) {
127  throw std::runtime_error(
128  "Inconsistency in num tuples within fragment for table " +
129  std::to_string(physicalTableId_) + ", Column " +
130  std::to_string(cur_column_id) + ". Fragment Tuples: " +
131  std::to_string(fragmentInfoVec_.back().getPhysicalNumTuples()) +
132  ", Chunk Tuples: " + std::to_string(chunk_itr->second.numElements));
133  }
134  }
135  fragmentInfoVec_.back().setChunkMetadata(cur_column_id, chunk_itr->second);
136  }
137  }
138 
139  ssize_t maxFixedColSize = 0;
140 
141  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
142  ssize_t size = colIt->second.get_column_desc()->columnType.get_size();
143  if (size == -1) { // variable length
144  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
145  size = 8; // b/c we use this for string and array indices - gross to have magic
146  // number here
147  }
148  maxFixedColSize = std::max(maxFixedColSize, size);
149  }
150 
151  // this is maximum number of rows assuming everything is fixed length
152  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
153 
154  if (fragmentInfoVec_.size() > 0) {
155  // Now need to get the insert buffers for each column - should be last
156  // fragment
157  int lastFragmentId = fragmentInfoVec_.back().fragmentId;
158  int deviceId =
159  fragmentInfoVec_.back().deviceIds[static_cast<int>(defaultInsertLevel_)];
160  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
161  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
162  insertKey.push_back(colIt->first); // column id
163  insertKey.push_back(lastFragmentId); // fragment id
164  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
165  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
166  if (varLenColInfoIt != varLenColInfo_.end()) {
167  varLenColInfoIt->second = colIt->second.get_buffer()->size();
168  }
169  }
170  }
171 }
172 
173 void InsertOrderFragmenter::dropFragmentsToSize(const size_t maxRows) {
174  // not safe to call from outside insertData
175  // b/c depends on insertLock around numTuples_
176 
177  // don't ever drop the only fragment!
178  if (numTuples_ == fragmentInfoVec_.back().getPhysicalNumTuples()) {
179  return;
180  }
181 
182  if (numTuples_ > maxRows) {
183  size_t preNumTuples = numTuples_;
184  vector<int> dropFragIds;
185  size_t targetRows = maxRows * DROP_FRAGMENT_FACTOR;
186  while (numTuples_ > targetRows) {
187  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
188  size_t numFragTuples = fragmentInfoVec_[0].getPhysicalNumTuples();
189  dropFragIds.push_back(fragmentInfoVec_[0].fragmentId);
190  fragmentInfoVec_.pop_front();
191  CHECK_GE(numTuples_, numFragTuples);
192  numTuples_ -= numFragTuples;
193  }
194  deleteFragments(dropFragIds);
195  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
196  << " post: " << numTuples_ << " maxRows: " << maxRows;
197  }
198 }
199 
200 void InsertOrderFragmenter::deleteFragments(const vector<int>& dropFragIds) {
201  // Fix a verified loophole on sharded logical table which is locked using logical
202  // tableId while it's its physical tables that can come here when fragments overflow
203  // during COPY. Locks on a logical table and its physical tables never intersect, which
204  // means potential races. It'll be an overkill to resolve a logical table to physical
205  // tables in MapDHandler, ParseNode or other higher layers where the logical table is
206  // locked with Table Read/Write locks; it's easier to lock the logical table of its
207  // physical tables. A downside of this approach may be loss of parallel execution of
208  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
209  // operation, the loss seems not a big deal.
210  auto chunkKeyPrefix = chunkKeyPrefix_;
211  if (shard_ >= 0) {
212  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
213  }
214 
215  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
216  // SELECT and COPY may enter a deadlock
217  using namespace Lock_Namespace;
218  WriteLock delete_lock = TableLockMgr::getWriteLockForTable(chunkKeyPrefix);
219 
220  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
221 
222  for (const auto fragId : dropFragIds) {
223  for (const auto& col : columnMap_) {
224  int colId = col.first;
225  vector<int> fragPrefix = chunkKeyPrefix_;
226  fragPrefix.push_back(colId);
227  fragPrefix.push_back(fragId);
228  dataMgr_->deleteChunksWithPrefix(fragPrefix);
229  }
230  }
231 }
232 
233 void InsertOrderFragmenter::updateChunkStats(
234  const ColumnDescriptor* cd,
235  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map) {
241  if (shard_ >= 0) {
242  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
243  }
244 
245  CHECK(cd);
246  const auto column_id = cd->columnId;
247  const auto col_itr = columnMap_.find(column_id);
248  CHECK(col_itr != columnMap_.end());
249 
250  for (auto& fragment : fragmentInfoVec_) {
251  auto stats_itr = stats_map.find(fragment.fragmentId);
252  if (stats_itr != stats_map.end()) {
253  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(column_id);
254  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
255  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
256  physicalTableId_,
257  column_id,
258  fragment.fragmentId};
259  auto chunk = Chunk_NS::Chunk::getChunk(cd,
260  &catalog_->getDataMgr(),
261  chunk_key,
263  0,
264  chunk_meta_it->second.numBytes,
265  chunk_meta_it->second.numElements);
266  auto buf = chunk->get_buffer();
267  CHECK(buf);
268  auto encoder = buf->encoder.get();
269  if (!encoder) {
270  throw std::runtime_error("No encoder for chunk " + showChunk(chunk_key));
271  }
272 
273  auto chunk_stats = stats_itr->second;
274 
275  ChunkMetadata old_chunk_metadata;
276  encoder->getMetadata(old_chunk_metadata);
277  auto& old_chunk_stats = old_chunk_metadata.chunkStats;
278 
279  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
280  // Use the logical type to display data, since the encoding should be ignored
281  const auto logical_ti = get_logical_type_info(cd->columnType);
282  if (!didResetStats) {
283  VLOG(3) << "Skipping chunk stats reset for " << showChunk(chunk_key);
284  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
285  << DatumToString(chunk_stats.max, logical_ti);
286  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
287  << DatumToString(chunk_stats.min, logical_ti);
288  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
289  continue; // move to next fragment
290  }
291 
292  VLOG(2) << "Resetting chunk stats for " << showChunk(chunk_key);
293  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
294  << DatumToString(chunk_stats.max, logical_ti);
295  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
296  << DatumToString(chunk_stats.min, logical_ti);
297  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
298 
299  // Reset fragment metadata map and set buffer to dirty
300  ChunkMetadata new_metadata;
301  // Run through fillChunkStats to ensure any transformations to the raw metadata
302  // values get applied (e.g. for date in days)
303  encoder->getMetadata(new_metadata);
304  fragment.setChunkMetadata(column_id, new_metadata);
305  fragment.shadowChunkMetadataMap =
306  fragment.getChunkMetadataMap(); // TODO(adb): needed?
307  buf->setDirty();
308  } else {
309  LOG(WARNING) << "No chunk stats update found for fragment " << fragment.fragmentId
310  << ", table " << physicalTableId_ << ", "
311  << ", column " << column_id;
312  }
313  }
314 }
315 
316 void InsertOrderFragmenter::insertData(InsertData& insertDataStruct) {
317  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
318  try {
319  mapd_unique_lock<mapd_shared_mutex> insertLock(
320  insertMutex_); // prevent two threads from trying to insert into the same table
321  // simultaneously
322 
323  insertDataImpl(insertDataStruct);
324 
325  if (defaultInsertLevel_ ==
326  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
327  dataMgr_->checkpoint(
328  chunkKeyPrefix_[0],
329  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
330  }
331  } catch (...) {
332  int32_t tableEpoch =
333  catalog_->getTableEpoch(insertDataStruct.databaseId, insertDataStruct.tableId);
334 
335  // the statement below deletes *this* object!
336  // relying on exception propagation at this stage
337  // until we can sort this out in a cleaner fashion
338  catalog_->setTableEpoch(
339  insertDataStruct.databaseId, insertDataStruct.tableId, tableEpoch);
340  throw;
341  }
342 }
343 
344 void InsertOrderFragmenter::insertDataNoCheckpoint(InsertData& insertDataStruct) {
345  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
346  mapd_unique_lock<mapd_shared_mutex> insertLock(
347  insertMutex_); // prevent two threads from trying to insert into the same table
348  // simultaneously
349  insertDataImpl(insertDataStruct);
350 }
351 
352 void InsertOrderFragmenter::replicateData(const InsertData& insertDataStruct) {
353  size_t numRowsLeft = insertDataStruct.numRows;
354  for (auto& fragmentInfo : fragmentInfoVec_) {
355  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysical();
356  auto numRowsToInsert = fragmentInfo.getPhysicalNumTuples(); // not getNumTuples()
357  size_t numRowsCanBeInserted;
358  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
359  if (insertDataStruct.bypass[i]) {
360  continue;
361  }
362  auto columnId = insertDataStruct.columnIds[i];
363  auto colDesc = insertDataStruct.columnDescriptors.at(columnId);
364  CHECK(columnMap_.find(columnId) != columnMap_.end());
365 
366  ChunkKey chunkKey = chunkKeyPrefix_;
367  chunkKey.push_back(columnId);
368  chunkKey.push_back(fragmentInfo.fragmentId);
369 
370  auto colMapIt = columnMap_.find(columnId);
371  auto& chunk = colMapIt->second;
372  if (chunk.isChunkOnDevice(
373  dataMgr_,
374  chunkKey,
375  defaultInsertLevel_,
376  fragmentInfo.deviceIds[static_cast<int>(defaultInsertLevel_)])) {
377  dataMgr_->deleteChunksWithPrefix(chunkKey);
378  }
379  chunk.createChunkBuffer(
380  dataMgr_,
381  chunkKey,
382  defaultInsertLevel_,
383  fragmentInfo.deviceIds[static_cast<int>(defaultInsertLevel_)]);
384  chunk.init_encoder();
385 
386  try {
387  DataBlockPtr dataCopy = insertDataStruct.data[i];
388  auto size = colDesc->columnType.get_size();
389  if (0 > size) {
390  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
391  varLenColInfo_[columnId] = 0;
392  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
393  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
394  } else {
395  numRowsCanBeInserted = maxChunkSize_ / size;
396  }
397 
398  // FIXME: abort a case in which new column is wider than existing columns
399  if (numRowsCanBeInserted < numRowsToInsert) {
400  throw std::runtime_error("new column '" + colDesc->columnName +
401  "' wider than existing columns is not supported");
402  }
403 
404  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
405  {
406  std::unique_lock<std::mutex> lck(*fragmentInfo.mutex_access_inmem_states);
407  fragmentInfo.shadowChunkMetadataMap[columnId] = chunkMetadata;
408  }
409 
410  // update total size of var-len column in (actually the last) fragment
411  if (0 > size) {
412  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
413  varLenColInfo_[columnId] = chunk.get_buffer()->size();
414  }
415  } catch (...) {
416  dataMgr_->deleteChunksWithPrefix(chunkKey);
417  throw;
418  }
419  }
420  numRowsLeft -= numRowsToInsert;
421  }
422  CHECK(0 == numRowsLeft);
423 
424  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
425  for (auto& fragmentInfo : fragmentInfoVec_) {
426  fragmentInfo.setChunkMetadataMap(fragmentInfo.shadowChunkMetadataMap);
427  }
428 }
429 
430 void InsertOrderFragmenter::insertDataImpl(InsertData& insertDataStruct) {
431  // populate deleted system column if it should exists, as it will not come from client
432  // Do not add this magical column in the replicate ALTER TABLE ADD route as
433  // it is not needed and will cause issues
434  std::unique_ptr<int8_t[]> data_for_deleted_column;
435  for (const auto& cit : columnMap_) {
436  if (cit.second.get_column_desc()->isDeletedCol &&
437  insertDataStruct.replicate_count == 0) {
438  data_for_deleted_column.reset(new int8_t[insertDataStruct.numRows]);
439  memset(data_for_deleted_column.get(), 0, insertDataStruct.numRows);
440  insertDataStruct.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
441  insertDataStruct.columnIds.push_back(cit.second.get_column_desc()->columnId);
442  insertDataStruct.columnDescriptors[cit.first] = cit.second.get_column_desc();
443  break;
444  }
445  }
446  // MAT we need to add a removal of the empty column we pushed onto here
447  // for upstream safety. Should not be a problem but need to fix.
448 
449  // insert column to columnMap_ if not yet (ALTER ADD COLUMN)
450  for (const auto columnId : insertDataStruct.columnIds) {
451  if (columnMap_.end() == columnMap_.find(columnId)) {
452  columnMap_.emplace(
453  columnId, Chunk_NS::Chunk(insertDataStruct.columnDescriptors.at(columnId)));
454  }
455  }
456 
457  // when replicate (add) column(s), this path seems wont work; go separate route...
458  if (insertDataStruct.replicate_count > 0) {
459  replicateData(insertDataStruct);
460  return;
461  }
462 
463  std::unordered_map<int, int> inverseInsertDataColIdMap;
464  for (size_t insertId = 0; insertId < insertDataStruct.columnIds.size(); ++insertId) {
465  inverseInsertDataColIdMap.insert(
466  std::make_pair(insertDataStruct.columnIds[insertId], insertId));
467  }
468 
469  size_t numRowsLeft = insertDataStruct.numRows;
470  size_t numRowsInserted = 0;
471  vector<DataBlockPtr> dataCopy =
472  insertDataStruct.data; // bc append data will move ptr forward and this violates
473  // constness of InsertData
474  if (numRowsLeft <= 0) {
475  return;
476  }
477 
478  FragmentInfo* currentFragment = 0;
479 
480  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
481  currentFragment = createNewFragment(defaultInsertLevel_);
482  } else {
483  currentFragment = &(fragmentInfoVec_.back());
484  }
485  size_t startFragment = fragmentInfoVec_.size() - 1;
486 
487  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
488  // loop until done inserting all rows
489  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
490  size_t rowsLeftInCurrentFragment =
491  maxFragmentRows_ - currentFragment->shadowNumTuples;
492  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
493  if (rowsLeftInCurrentFragment != 0) {
494  for (auto& varLenColInfoIt : varLenColInfo_) {
495  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
496  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
497  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
498  if (insertIdIt != inverseInsertDataColIdMap.end()) {
499  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
500  numRowsToInsert = std::min(
501  numRowsToInsert,
502  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
503  numRowsToInsert,
504  numRowsInserted,
505  bytesLeft));
506  }
507  }
508  }
509 
510  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
511  currentFragment = createNewFragment(defaultInsertLevel_);
512  if (numRowsInserted == 0) {
513  startFragment++;
514  }
515  rowsLeftInCurrentFragment = maxFragmentRows_;
516  for (auto& varLenColInfoIt : varLenColInfo_) {
517  varLenColInfoIt.second = 0; // reset byte counter
518  }
519  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
520  for (auto& varLenColInfoIt : varLenColInfo_) {
521  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
522  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
523  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
524  if (insertIdIt != inverseInsertDataColIdMap.end()) {
525  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
526  numRowsToInsert = std::min(
527  numRowsToInsert,
528  colMapIt->second.getNumElemsForBytesInsertData(dataCopy[insertIdIt->second],
529  numRowsToInsert,
530  numRowsInserted,
531  bytesLeft));
532  }
533  }
534  }
535 
536  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
537  // never be able to insert anything
538 
539  // for each column, append the data in the appropriate insert buffer
540  for (size_t i = 0; i < insertDataStruct.columnIds.size(); ++i) {
541  int columnId = insertDataStruct.columnIds[i];
542  auto colMapIt = columnMap_.find(columnId);
543  CHECK(colMapIt != columnMap_.end());
544  currentFragment->shadowChunkMetadataMap[columnId] =
545  colMapIt->second.appendData(dataCopy[i], numRowsToInsert, numRowsInserted);
546  auto varLenColInfoIt = varLenColInfo_.find(columnId);
547  if (varLenColInfoIt != varLenColInfo_.end()) {
548  varLenColInfoIt->second = colMapIt->second.get_buffer()->size();
549  }
550  }
551  if (hasMaterializedRowId_) {
552  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
553  currentFragment->shadowNumTuples;
554  int64_t* rowIdData = new int64_t[numRowsToInsert];
555  for (size_t i = 0; i < numRowsToInsert; ++i) {
556  rowIdData[i] = i + startId;
557  }
558  DataBlockPtr rowIdBlock;
559  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(rowIdData);
560  auto colMapIt = columnMap_.find(rowIdColId_);
561  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
562  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
563  delete[] rowIdData;
564  }
565 
566  currentFragment->shadowNumTuples =
567  fragmentInfoVec_.back().getPhysicalNumTuples() + numRowsToInsert;
568  numRowsLeft -= numRowsToInsert;
569  numRowsInserted += numRowsToInsert;
570  }
571  {
572  // Only take the fragment info lock when updating fragment info map. Otherwise,
573  // potential deadlock can occur after SELECT has locked TableReadLock and COPY_FROM
574  // has locked fragmentInfoMutex_ while SELECT waits for fragmentInfoMutex_ and
575  // COPY_FROM waits for TableWriteLock
576  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
577  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
578  partIt != fragmentInfoVec_.end();
579  ++partIt) {
580  partIt->setPhysicalNumTuples(partIt->shadowNumTuples);
581  partIt->setChunkMetadataMap(partIt->shadowChunkMetadataMap);
582  }
583  }
584  numTuples_ += insertDataStruct.numRows;
585  dropFragmentsToSize(maxRows_);
586 }
587 
588 FragmentInfo* InsertOrderFragmenter::createNewFragment(
589  const Data_Namespace::MemoryLevel memoryLevel) {
590  // also sets the new fragment as the insertBuffer for each column
591 
592  maxFragmentId_++;
593  FragmentInfo newFragmentInfo;
594  newFragmentInfo.fragmentId = maxFragmentId_;
595  newFragmentInfo.shadowNumTuples = 0;
596  newFragmentInfo.setPhysicalNumTuples(0);
597  for (const auto levelSize : dataMgr_->levelSizes_) {
598  newFragmentInfo.deviceIds.push_back(newFragmentInfo.fragmentId % levelSize);
599  }
600  newFragmentInfo.physicalTableId = physicalTableId_;
601  newFragmentInfo.shard = shard_;
602 
603  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
604  colMapIt != columnMap_.end();
605  ++colMapIt) {
606  // colMapIt->second.unpin_buffer();
607  ChunkKey chunkKey = chunkKeyPrefix_;
608  chunkKey.push_back(colMapIt->second.get_column_desc()->columnId);
609  chunkKey.push_back(maxFragmentId_);
610  colMapIt->second.createChunkBuffer(
611  dataMgr_,
612  chunkKey,
613  memoryLevel,
614  newFragmentInfo.deviceIds[static_cast<int>(memoryLevel)],
615  pageSize_);
616  colMapIt->second.init_encoder();
617  }
618 
619  mapd_lock_guard<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
620  fragmentInfoVec_.push_back(newFragmentInfo);
621  return &(fragmentInfoVec_.back());
622 }
623 
624 TableInfo InsertOrderFragmenter::getFragmentsForQuery() {
625  mapd_shared_lock<mapd_shared_mutex> readLock(fragmentInfoMutex_);
626  TableInfo queryInfo;
627  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
628  // right now we don't test predicate, so just return (copy of) all fragments
629  bool fragmentsExist = false;
630  if (fragmentInfoVec_.empty()) {
631  // If we have no fragments add a dummy empty fragment to make the executor
632  // not have separate logic for 0-row tables
633  int maxFragmentId = 0;
634  FragmentInfo emptyFragmentInfo;
635  emptyFragmentInfo.fragmentId = maxFragmentId;
636  emptyFragmentInfo.shadowNumTuples = 0;
637  emptyFragmentInfo.setPhysicalNumTuples(0);
638  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
639  emptyFragmentInfo.physicalTableId = physicalTableId_;
640  emptyFragmentInfo.shard = shard_;
641  queryInfo.fragments.push_back(emptyFragmentInfo);
642  } else {
643  fragmentsExist = true;
644  queryInfo.fragments = fragmentInfoVec_; // makes a copy
645  }
646  readLock.unlock();
647  queryInfo.setPhysicalNumTuples(0);
648  auto partIt = queryInfo.fragments.begin();
649  if (fragmentsExist) {
650  while (partIt != queryInfo.fragments.end()) {
651  if (partIt->getPhysicalNumTuples() == 0) {
652  // this means that a concurrent insert query inserted tuples into a new fragment
653  // but when the query came in we didn't have this fragment. To make sure we don't
654  // mess up the executor we delete this fragment from the metadatamap (fixes
655  // earlier bug found 2015-05-08)
656  partIt = queryInfo.fragments.erase(partIt);
657  } else {
658  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
659  partIt->getPhysicalNumTuples());
660  ++partIt;
661  }
662  }
663  } else {
664  // We added a dummy fragment and know the table is empty
665  queryInfo.setPhysicalNumTuples(0);
666  }
667  return queryInfo;
668 }
669 
670 } // namespace Fragmenter_Namespace
catalog_(nullptr)
std::vector< int > ChunkKey
Definition: types.h:35
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:193
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
#define LOG(tag)
Definition: Logger.h:185
#define CHECK_GE(x, y)
Definition: Logger.h:203
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
std::map< int, ChunkMetadata > shadowChunkMetadataMap
Definition: Fragmenter.h:127
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:166
#define CHECK_GT(x, y)
Definition: Logger.h:202
std::string to_string(char const *&&v)
std::map< int, const ColumnDescriptor * > columnDescriptors
Definition: Fragmenter.h:69
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:160
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CHECK(cgen_state)
std::vector< bool > bypass
count to replicate values of column(s); used only for ALTER ADD column
Definition: Fragmenter.h:68
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
#define CHECK_LE(x, y)
Definition: Logger.h:201
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:162
#define DROP_FRAGMENT_FACTOR
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:112
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:67
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
mapd_unique_lock< MutexType > WriteLock
Definition: TableLockMgr.h:47
SQLTypeInfo columnType
int8_t * numbersPtr
Definition: sqltypes.h:140
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
#define VLOG(n)
Definition: Logger.h:280