OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InsertOrderFragmenter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/lexical_cast.hpp>
21 #include <cassert>
22 #include <cmath>
23 #include <iostream>
24 #include <limits>
25 #include <memory>
26 #include <thread>
27 #include <type_traits>
28 
29 #include "DataMgr/AbstractBuffer.h"
30 #include "DataMgr/DataMgr.h"
32 #include "LockMgr/LockMgr.h"
33 #include "Logger/Logger.h"
34 
35 #include "Shared/checked_alloc.h"
36 #include "Shared/thread_count.h"
37 
38 #define DROP_FRAGMENT_FACTOR \
39  0.97 // drop to 97% of max so we don't keep adding and dropping fragments
40 
41 using Chunk_NS::Chunk;
44 
46 
47 using namespace std;
48 
49 namespace Fragmenter_Namespace {
50 
51 InsertOrderFragmenter::InsertOrderFragmenter(
52  const vector<int> chunkKeyPrefix,
53  vector<Chunk>& chunkVec,
54  Data_Namespace::DataMgr* dataMgr,
56  const int physicalTableId,
57  const int shard,
58  const size_t maxFragmentRows,
59  const size_t maxChunkSize,
60  const size_t pageSize,
61  const size_t maxRows,
62  const Data_Namespace::MemoryLevel defaultInsertLevel,
63  const bool uses_foreign_storage)
64  : chunkKeyPrefix_(chunkKeyPrefix)
65  , dataMgr_(dataMgr)
66  , catalog_(catalog)
67  , physicalTableId_(physicalTableId)
68  , shard_(shard)
69  , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
70  , pageSize_(pageSize)
71  , numTuples_(0)
72  , maxFragmentId_(-1)
73  , maxChunkSize_(maxChunkSize)
74  , maxRows_(maxRows)
75  , fragmenterType_("insert_order")
76  , defaultInsertLevel_(defaultInsertLevel)
77  , uses_foreign_storage_(uses_foreign_storage)
78  , hasMaterializedRowId_(false)
79  , mutex_access_inmem_states(new std::mutex) {
80  // Note that Fragmenter is not passed virtual columns and so should only
81  // find row id column if it is non virtual
82 
83  for (auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
84  int columnId = colIt->getColumnDesc()->columnId;
85  columnMap_[columnId] = *colIt;
86  if (colIt->getColumnDesc()->columnName == "rowid") {
87  hasMaterializedRowId_ = true;
88  rowIdColId_ = columnId;
89  }
90  }
91  conditionallyInstantiateFileMgrWithParams();
92  getChunkMetadata();
93 }
94 
95 InsertOrderFragmenter::~InsertOrderFragmenter() {}
96 
97 namespace {
98 
103 int compute_device_for_fragment(const int table_id,
104  const int fragment_id,
105  const int num_devices) {
107  return (table_id + fragment_id) % num_devices;
108  } else {
109  return fragment_id % num_devices;
110  }
111 }
112 
113 size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment,
114  const size_t num_rows_left,
115  const size_t num_rows_inserted,
116  const std::unordered_map<int, size_t>& var_len_col_info,
117  const size_t max_chunk_size,
118  const InsertChunks& insert_chunks,
119  std::map<int, Chunk_NS::Chunk>& column_map,
120  const std::vector<size_t>& valid_row_indices) {
121  size_t num_rows_to_insert = min(rows_left_in_current_fragment, num_rows_left);
122  if (rows_left_in_current_fragment != 0) {
123  for (const auto& var_len_col_info_it : var_len_col_info) {
124  CHECK_LE(var_len_col_info_it.second, max_chunk_size);
125  size_t bytes_left = max_chunk_size - var_len_col_info_it.second;
126  auto find_it = insert_chunks.chunks.find(var_len_col_info_it.first);
127  if (find_it == insert_chunks.chunks.end()) {
128  continue;
129  }
130  const auto& chunk = find_it->second;
131  auto column_type = chunk->getColumnDesc()->columnType;
132  const int8_t* index_buffer_ptr =
133  column_type.is_varlen_indeed() ? chunk->getIndexBuf()->getMemoryPtr() : nullptr;
134  CHECK(column_type.is_varlen());
135 
136  auto col_map_it = column_map.find(var_len_col_info_it.first);
137  num_rows_to_insert =
138  std::min(num_rows_to_insert,
139  col_map_it->second.getNumElemsForBytesEncodedDataAtIndices(
140  index_buffer_ptr, valid_row_indices, bytes_left));
141  }
142  }
143  return num_rows_to_insert;
144 }
145 
146 } // namespace
147 
148 void InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams() {
149  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
150  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
151  // storage per table
152  if (!uses_foreign_storage_ &&
153  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
154  const TableDescriptor* td =
155  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
156  File_Namespace::FileMgrParams fileMgrParams;
157  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
158  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
159  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
160  }
161 }
162 
163 void InsertOrderFragmenter::getChunkMetadata() {
164  if (uses_foreign_storage_ ||
165  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
166  // memory-resident tables won't have anything on disk
167  ChunkMetadataVector chunk_metadata;
168  dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
169 
170  // data comes like this - database_id, table_id, column_id, fragment_id
171  // but lets sort by database_id, table_id, fragment_id, column_id
172 
173  int fragment_subkey_index = 3;
174  std::sort(chunk_metadata.begin(),
175  chunk_metadata.end(),
176  [&](const auto& pair1, const auto& pair2) {
177  return pair1.first[3] < pair2.first[3];
178  });
179 
180  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
181  ++chunk_itr) {
182  int cur_column_id = chunk_itr->first[2];
183  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
184 
185  if (fragmentInfoVec_.empty() ||
186  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
187  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
188  CHECK(new_fragment_info);
189  maxFragmentId_ = cur_fragment_id;
190  new_fragment_info->fragmentId = cur_fragment_id;
191  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
192  numTuples_ += new_fragment_info->getPhysicalNumTuples();
193  for (const auto level_size : dataMgr_->levelSizes_) {
194  new_fragment_info->deviceIds.push_back(
195  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
196  }
197  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
198  new_fragment_info->physicalTableId = physicalTableId_;
199  new_fragment_info->shard = shard_;
200  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
201  } else {
202  if (chunk_itr->second->numElements !=
203  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
204  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
205  std::to_string(physicalTableId_) + ", Column " +
206  std::to_string(cur_column_id) + ". Fragment Tuples: " +
208  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
209  ", Chunk Tuples: " +
210  std::to_string(chunk_itr->second->numElements);
211  }
212  }
213  CHECK(fragmentInfoVec_.back().get());
214  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
215  }
216  }
217 
218  size_t maxFixedColSize = 0;
219 
220  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
221  auto size = colIt->second.getColumnDesc()->columnType.get_size();
222  if (size == -1) { // variable length
223  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
224  size = 8; // b/c we use this for string and array indices - gross to have magic
225  // number here
226  }
227  CHECK_GE(size, 0);
228  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
229  }
230 
231  // this is maximum number of rows assuming everything is fixed length
232  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
233  setLastFragmentVarLenColumnSizes();
234 }
235 
236 void InsertOrderFragmenter::dropFragmentsToSize(const size_t max_rows) {
237  heavyai::unique_lock<heavyai::shared_mutex> insert_lock(insertMutex_);
238  dropFragmentsToSizeNoInsertLock(max_rows);
239 }
240 
241 void InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock(const size_t max_rows) {
242  // not safe to call from outside insertData
243  // b/c depends on insertLock around numTuples_
244 
245  // don't ever drop the only fragment!
246  if (fragmentInfoVec_.empty() ||
247  numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
248  return;
249  }
250 
251  if (numTuples_ > max_rows) {
252  size_t preNumTuples = numTuples_;
253  vector<int> dropFragIds;
254  size_t targetRows = max_rows * DROP_FRAGMENT_FACTOR;
255  while (numTuples_ > targetRows) {
256  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
257  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
258  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
259  fragmentInfoVec_.pop_front();
260  CHECK_GE(numTuples_, numFragTuples);
261  numTuples_ -= numFragTuples;
262  }
263  deleteFragments(dropFragIds);
264  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
265  << " post: " << numTuples_ << " maxRows: " << max_rows;
266  }
267 }
268 
269 void InsertOrderFragmenter::deleteFragments(const vector<int>& dropFragIds) {
270  // Fix a verified loophole on sharded logical table which is locked using logical
271  // tableId while it's its physical tables that can come here when fragments overflow
272  // during COPY. Locks on a logical table and its physical tables never intersect, which
273  // means potential races. It'll be an overkill to resolve a logical table to physical
274  // tables in DBHandler, ParseNode or other higher layers where the logical table is
275  // locked with Table Read/Write locks; it's easier to lock the logical table of its
276  // physical tables. A downside of this approach may be loss of parallel execution of
277  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
278  // operation, the loss seems not a big deal.
279  auto chunkKeyPrefix = chunkKeyPrefix_;
280  if (shard_ >= 0) {
281  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
282  }
283 
284  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
285  // SELECT and COPY may enter a deadlock
286  const auto delete_lock =
288 
289  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
290 
291  for (const auto fragId : dropFragIds) {
292  for (const auto& col : columnMap_) {
293  int colId = col.first;
294  vector<int> fragPrefix = chunkKeyPrefix_;
295  fragPrefix.push_back(colId);
296  fragPrefix.push_back(fragId);
297  dataMgr_->deleteChunksWithPrefix(fragPrefix);
298  }
299  }
300 }
301 
302 void InsertOrderFragmenter::updateColumnChunkMetadata(
303  const ColumnDescriptor* cd,
304  const int fragment_id,
305  const std::shared_ptr<ChunkMetadata> metadata) {
306  // synchronize concurrent accesses to fragmentInfoVec_
307  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
308 
309  CHECK(metadata.get());
310  auto fragment_info = getFragmentInfo(fragment_id);
311  CHECK(fragment_info);
312  fragment_info->setChunkMetadata(cd->columnId, metadata);
313 }
314 
315 void InsertOrderFragmenter::updateChunkStats(
316  const ColumnDescriptor* cd,
317  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map,
318  std::optional<Data_Namespace::MemoryLevel> memory_level) {
319  // synchronize concurrent accesses to fragmentInfoVec_
320  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
326  if (shard_ >= 0) {
327  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
328  }
329 
330  CHECK(cd);
331  const auto column_id = cd->columnId;
332  const auto col_itr = columnMap_.find(column_id);
333  CHECK(col_itr != columnMap_.end());
334 
335  for (auto const& fragment : fragmentInfoVec_) {
336  auto stats_itr = stats_map.find(fragment->fragmentId);
337  if (stats_itr != stats_map.end()) {
338  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
339  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
340  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
341  physicalTableId_,
342  column_id,
343  fragment->fragmentId};
344  auto chunk = Chunk_NS::Chunk::getChunk(cd,
345  &catalog_->getDataMgr(),
346  chunk_key,
347  memory_level.value_or(defaultInsertLevel_),
348  0,
349  chunk_meta_it->second->numBytes,
350  chunk_meta_it->second->numElements);
351  auto buf = chunk->getBuffer();
352  CHECK(buf);
353  if (!buf->hasEncoder()) {
354  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
355  }
356  auto encoder = buf->getEncoder();
357 
358  auto chunk_stats = stats_itr->second;
359 
360  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
361  encoder->getMetadata(old_chunk_metadata);
362  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
363 
364  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
365  // Use the logical type to display data, since the encoding should be ignored
366  const auto logical_ti = cd->columnType.is_dict_encoded_string()
369  if (!didResetStats) {
370  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
371  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
372  << DatumToString(chunk_stats.max, logical_ti);
373  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
374  << DatumToString(chunk_stats.min, logical_ti);
375  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
376  continue; // move to next fragment
377  }
378 
379  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
380  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
381  << DatumToString(chunk_stats.max, logical_ti);
382  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
383  << DatumToString(chunk_stats.min, logical_ti);
384  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
385 
386  // Reset fragment metadata map and set buffer to dirty
387  auto new_metadata = std::make_shared<ChunkMetadata>();
388  // Run through fillChunkStats to ensure any transformations to the raw metadata
389  // values get applied (e.g. for date in days)
390  encoder->getMetadata(new_metadata);
391 
392  fragment->setChunkMetadata(column_id, new_metadata);
393  fragment->shadowChunkMetadataMap =
394  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
395  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
396  buf->setDirty();
397  }
398  } else {
399  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
400  << ", table " << physicalTableId_ << ", "
401  << ", column " << column_id;
402  }
403  }
404 }
405 
406 FragmentInfo* InsertOrderFragmenter::getFragmentInfo(const int fragment_id) const {
407  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
408  fragmentInfoVec_.end(),
409  [fragment_id](const auto& fragment) -> bool {
410  return fragment->fragmentId == fragment_id;
411  });
412  CHECK(fragment_it != fragmentInfoVec_.end());
413  return fragment_it->get();
414 }
415 
416 bool InsertOrderFragmenter::isAddingNewColumns(const InsertData& insert_data) const {
417  bool all_columns_already_exist = true, all_columns_are_new = true;
418  for (const auto column_id : insert_data.columnIds) {
419  if (columnMap_.find(column_id) == columnMap_.end()) {
420  all_columns_already_exist = false;
421  } else {
422  all_columns_are_new = false;
423  }
424  }
425  // only one should be TRUE
426  bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
427  CHECK(either_all_exist_or_all_new);
428  return all_columns_are_new;
429 }
430 
431 void InsertOrderFragmenter::insertChunks(const InsertChunks& insert_chunk) {
432  try {
433  // prevent two threads from trying to insert into the same table simultaneously
434  heavyai::unique_lock<heavyai::shared_mutex> insertLock(insertMutex_);
435  insertChunksImpl(insert_chunk);
436  if (defaultInsertLevel_ ==
437  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
438  dataMgr_->checkpoint(
439  chunkKeyPrefix_[0],
440  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
441  }
442  } catch (...) {
443  auto db_id = insert_chunk.db_id;
444  auto table_epochs = catalog_->getTableEpochs(db_id, insert_chunk.table_id);
445  // the statement below deletes *this* object!
446  // relying on exception propagation at this stage
447  // until we can sort this out in a cleaner fashion
448  catalog_->setTableEpochs(db_id, table_epochs);
449  throw;
450  }
451 }
452 
453 void InsertOrderFragmenter::insertData(InsertData& insert_data_struct) {
454  try {
455  // prevent two threads from trying to insert into the same table simultaneously
456  heavyai::unique_lock<heavyai::shared_mutex> insertLock(insertMutex_);
457  if (!isAddingNewColumns(insert_data_struct)) {
458  insertDataImpl(insert_data_struct);
459  } else {
460  addColumns(insert_data_struct);
461  }
462  if (defaultInsertLevel_ ==
463  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
464  dataMgr_->checkpoint(
465  chunkKeyPrefix_[0],
466  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
467  }
468  } catch (...) {
469  auto table_epochs = catalog_->getTableEpochs(insert_data_struct.databaseId,
470  insert_data_struct.tableId);
471  // the statement below deletes *this* object!
472  // relying on exception propagation at this stage
473  // until we can sort this out in a cleaner fashion
474  catalog_->setTableEpochs(insert_data_struct.databaseId, table_epochs);
475  throw;
476  }
477 }
478 
479 void InsertOrderFragmenter::insertChunksNoCheckpoint(const InsertChunks& insert_chunk) {
480  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
482  insertMutex_); // prevent two threads from trying to insert into the same table
483  // simultaneously
484  insertChunksImpl(insert_chunk);
485 }
486 
487 void InsertOrderFragmenter::insertDataNoCheckpoint(InsertData& insert_data_struct) {
488  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
490  insertMutex_); // prevent two threads from trying to insert into the same table
491  // simultaneously
492  if (!isAddingNewColumns(insert_data_struct)) {
493  insertDataImpl(insert_data_struct);
494  } else {
495  addColumns(insert_data_struct);
496  }
497 }
498 
499 void InsertOrderFragmenter::addColumns(const InsertData& insertDataStruct) {
500  // synchronize concurrent accesses to fragmentInfoVec_
501  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
502  size_t numRowsLeft = insertDataStruct.numRows;
503  for (const auto columnId : insertDataStruct.columnIds) {
504  CHECK(columnMap_.end() == columnMap_.find(columnId));
505  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
506  CHECK(columnDesc);
507  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
508  }
509  try {
510  for (auto const& fragmentInfo : fragmentInfoVec_) {
511  fragmentInfo->shadowChunkMetadataMap =
512  fragmentInfo->getChunkMetadataMapPhysicalCopy();
513  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
514  size_t numRowsCanBeInserted;
515  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
516  auto columnId = insertDataStruct.columnIds[i];
517  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
518  CHECK(colDesc);
519  CHECK(columnMap_.find(columnId) != columnMap_.end());
520 
521  ChunkKey chunkKey = chunkKeyPrefix_;
522  chunkKey.push_back(columnId);
523  chunkKey.push_back(fragmentInfo->fragmentId);
524 
525  auto colMapIt = columnMap_.find(columnId);
526  auto& chunk = colMapIt->second;
527  if (chunk.isChunkOnDevice(
528  dataMgr_,
529  chunkKey,
530  defaultInsertLevel_,
531  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
532  dataMgr_->deleteChunksWithPrefix(chunkKey);
533  }
534  chunk.createChunkBuffer(
535  dataMgr_,
536  chunkKey,
537  defaultInsertLevel_,
538  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
539  chunk.initEncoder();
540 
541  try {
542  DataBlockPtr dataCopy = insertDataStruct.data[i];
543  auto size = colDesc->columnType.get_size();
544  if (0 > size) {
545  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
546  varLenColInfo_[columnId] = 0;
547  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
548  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
549  } else {
550  numRowsCanBeInserted = maxChunkSize_ / size;
551  }
552 
553  // FIXME: abort a case in which new column is wider than existing columns
554  if (numRowsCanBeInserted < numRowsToInsert) {
555  throw std::runtime_error("new column '" + colDesc->columnName +
556  "' wider than existing columns is not supported");
557  }
558 
559  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
560  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
561 
562  // update total size of var-len column in (actually the last) fragment
563  if (0 > size) {
564  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
565  varLenColInfo_[columnId] = chunk.getBuffer()->size();
566  }
567  } catch (...) {
568  dataMgr_->deleteChunksWithPrefix(chunkKey);
569  throw;
570  }
571  }
572  numRowsLeft -= numRowsToInsert;
573  }
574  CHECK(0 == numRowsLeft);
575  } catch (const std::exception& e) {
576  for (const auto columnId : insertDataStruct.columnIds) {
577  columnMap_.erase(columnId);
578  }
579  throw e;
580  }
581 
582  for (auto const& fragmentInfo : fragmentInfoVec_) {
583  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
584  }
585 }
586 
587 void InsertOrderFragmenter::dropColumns(const std::vector<int>& columnIds) {
588  // prevent concurrent insert rows and drop column
589  heavyai::unique_lock<heavyai::shared_mutex> insertLock(insertMutex_);
590  // synchronize concurrent accesses to fragmentInfoVec_
591  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
592  for (auto const& fragmentInfo : fragmentInfoVec_) {
593  fragmentInfo->shadowChunkMetadataMap =
594  fragmentInfo->getChunkMetadataMapPhysicalCopy();
595  }
596 
597  for (const auto columnId : columnIds) {
598  auto cit = columnMap_.find(columnId);
599  if (columnMap_.end() != cit) {
600  columnMap_.erase(cit);
601  }
602 
603  vector<int> fragPrefix = chunkKeyPrefix_;
604  fragPrefix.push_back(columnId);
605  dataMgr_->deleteChunksWithPrefix(fragPrefix);
606 
607  for (const auto& fragmentInfo : fragmentInfoVec_) {
608  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
609  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
610  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
611  }
612  }
613  }
614  for (const auto& fragmentInfo : fragmentInfoVec_) {
615  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
616  }
617 }
618 
619 bool InsertOrderFragmenter::hasDeletedRows(const int delete_column_id) {
621 
622  for (auto const& fragment : fragmentInfoVec_) {
623  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
624  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
625  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
626  if (chunk_stats.max.tinyintval == 1) {
627  return true;
628  }
629  }
630  return false;
631 }
632 
633 void InsertOrderFragmenter::insertChunksIntoFragment(
634  const InsertChunks& insert_chunks,
635  const std::optional<int> delete_column_id,
636  FragmentInfo* current_fragment,
637  const size_t num_rows_to_insert,
638  size_t& num_rows_inserted,
639  size_t& num_rows_left,
640  std::vector<size_t>& valid_row_indices,
641  const size_t start_fragment) {
643  // for each column, append the data in the appropriate insert buffer
644  auto insert_row_indices = valid_row_indices;
645  CHECK_GE(insert_row_indices.size(), num_rows_to_insert);
646  insert_row_indices.erase(insert_row_indices.begin() + num_rows_to_insert,
647  insert_row_indices.end());
648  CHECK_EQ(insert_row_indices.size(), num_rows_to_insert);
649  for (auto& [column_id, chunk] : insert_chunks.chunks) {
650  auto col_map_it = columnMap_.find(column_id);
651  CHECK(col_map_it != columnMap_.end());
652  current_fragment->shadowChunkMetadataMap[column_id] =
653  col_map_it->second.appendEncodedDataAtIndices(*chunk, insert_row_indices);
654  auto var_len_col_info_it = varLenColInfo_.find(column_id);
655  if (var_len_col_info_it != varLenColInfo_.end()) {
656  var_len_col_info_it->second = col_map_it->second.getBuffer()->size();
657  CHECK_LE(var_len_col_info_it->second, maxChunkSize_);
658  }
659  }
660  if (hasMaterializedRowId_) {
661  size_t start_id = maxFragmentRows_ * current_fragment->fragmentId +
662  current_fragment->shadowNumTuples;
663  std::vector<int64_t> row_id_data(num_rows_to_insert);
664  for (size_t i = 0; i < num_rows_to_insert; ++i) {
665  row_id_data[i] = i + start_id;
666  }
667  DataBlockPtr row_id_block;
668  row_id_block.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.data());
669  auto col_map_it = columnMap_.find(rowIdColId_);
670  CHECK(col_map_it != columnMap_.end());
671  current_fragment->shadowChunkMetadataMap[rowIdColId_] = col_map_it->second.appendData(
672  row_id_block, num_rows_to_insert, num_rows_inserted);
673  }
674 
675  if (delete_column_id) { // has delete column
676  std::vector<int8_t> delete_data(num_rows_to_insert, false);
677  DataBlockPtr delete_block;
678  delete_block.numbersPtr = reinterpret_cast<int8_t*>(delete_data.data());
679  auto col_map_it = columnMap_.find(*delete_column_id);
680  CHECK(col_map_it != columnMap_.end());
681  current_fragment->shadowChunkMetadataMap[*delete_column_id] =
682  col_map_it->second.appendData(
683  delete_block, num_rows_to_insert, num_rows_inserted);
684  }
685 
686  current_fragment->shadowNumTuples =
687  fragmentInfoVec_.back()->getPhysicalNumTuples() + num_rows_to_insert;
688  num_rows_left -= num_rows_to_insert;
689  num_rows_inserted += num_rows_to_insert;
690  for (auto part_it = fragmentInfoVec_.begin() + start_fragment;
691  part_it != fragmentInfoVec_.end();
692  ++part_it) {
693  auto fragment_ptr = part_it->get();
694  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
695  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
696  }
697 
698  // truncate the first `num_rows_to_insert` rows in `valid_row_indices`
699  valid_row_indices.erase(valid_row_indices.begin(),
700  valid_row_indices.begin() + num_rows_to_insert);
701 }
702 
703 void InsertOrderFragmenter::insertChunksImpl(const InsertChunks& insert_chunks) {
704  std::optional<int> delete_column_id{std::nullopt};
705  for (const auto& cit : columnMap_) {
706  if (cit.second.getColumnDesc()->isDeletedCol) {
707  delete_column_id = cit.second.getColumnDesc()->columnId;
708  }
709  }
710 
711  // verify that all chunks to be inserted have same number of rows, otherwise the input
712  // data is malformed
713  std::optional<size_t> num_rows{std::nullopt};
714  for (const auto& [column_id, chunk] : insert_chunks.chunks) {
715  auto buffer = chunk->getBuffer();
716  CHECK(buffer);
717  CHECK(buffer->hasEncoder());
718  if (!num_rows.has_value()) {
719  num_rows = buffer->getEncoder()->getNumElems();
720  } else {
721  CHECK_EQ(num_rows.value(), buffer->getEncoder()->getNumElems());
722  }
723  }
724 
725  auto valid_row_indices = insert_chunks.valid_row_indices;
726  size_t num_rows_left = valid_row_indices.size();
727  size_t num_rows_inserted = 0;
728 
729  if (num_rows_left == 0) {
730  return;
731  }
732 
733  FragmentInfo* current_fragment{nullptr};
734 
735  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
736  // feels fragile
737  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
738  current_fragment = createNewFragment(defaultInsertLevel_);
739  } else {
740  current_fragment = fragmentInfoVec_.back().get();
741  }
742  CHECK(current_fragment);
743 
744  size_t start_fragment = fragmentInfoVec_.size() - 1;
745 
746  while (num_rows_left > 0) { // may have to create multiple fragments for bulk insert
747  // loop until done inserting all rows
748  CHECK_LE(current_fragment->shadowNumTuples, maxFragmentRows_);
749  size_t rows_left_in_current_fragment =
750  maxFragmentRows_ - current_fragment->shadowNumTuples;
751  size_t num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
752  num_rows_left,
753  num_rows_inserted,
754  varLenColInfo_,
755  maxChunkSize_,
756  insert_chunks,
757  columnMap_,
758  valid_row_indices);
759 
760  if (rows_left_in_current_fragment == 0 || num_rows_to_insert == 0) {
761  current_fragment = createNewFragment(defaultInsertLevel_);
762  if (num_rows_inserted == 0) {
763  start_fragment++;
764  }
765  rows_left_in_current_fragment = maxFragmentRows_;
766  for (auto& varLenColInfoIt : varLenColInfo_) {
767  varLenColInfoIt.second = 0; // reset byte counter
768  }
769  num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
770  num_rows_left,
771  num_rows_inserted,
772  varLenColInfo_,
773  maxChunkSize_,
774  insert_chunks,
775  columnMap_,
776  valid_row_indices);
777  }
778 
779  CHECK_GT(num_rows_to_insert, size_t(0)); // would put us into an endless loop as we'd
780  // never be able to insert anything
781 
782  insertChunksIntoFragment(insert_chunks,
783  delete_column_id,
784  current_fragment,
785  num_rows_to_insert,
786  num_rows_inserted,
787  num_rows_left,
788  valid_row_indices,
789  start_fragment);
790  }
791  numTuples_ += *num_rows;
792  dropFragmentsToSizeNoInsertLock(maxRows_);
793 }
794 
795 void InsertOrderFragmenter::insertDataImpl(InsertData& insert_data) {
796  // populate deleted system column if it should exist, as it will not come from client
797  std::unique_ptr<int8_t[]> data_for_deleted_column;
798  for (const auto& cit : columnMap_) {
799  if (cit.second.getColumnDesc()->isDeletedCol) {
800  data_for_deleted_column.reset(new int8_t[insert_data.numRows]);
801  memset(data_for_deleted_column.get(), 0, insert_data.numRows);
802  insert_data.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
803  insert_data.columnIds.push_back(cit.second.getColumnDesc()->columnId);
804  insert_data.is_default.push_back(false);
805  break;
806  }
807  }
808  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
809  std::unordered_map<int, int> inverseInsertDataColIdMap;
810  for (size_t insertId = 0; insertId < insert_data.columnIds.size(); ++insertId) {
811  inverseInsertDataColIdMap.insert(
812  std::make_pair(insert_data.columnIds[insertId], insertId));
813  }
814 
815  size_t numRowsLeft = insert_data.numRows;
816  size_t numRowsInserted = 0;
817  vector<DataBlockPtr> dataCopy =
818  insert_data.data; // bc append data will move ptr forward and this violates
819  // constness of InsertData
820  if (numRowsLeft <= 0) {
821  return;
822  }
823 
824  FragmentInfo* currentFragment{nullptr};
825 
826  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
827  // feels fragile
828  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
829  currentFragment = createNewFragment(defaultInsertLevel_);
830  } else {
831  currentFragment = fragmentInfoVec_.back().get();
832  }
833  CHECK(currentFragment);
834 
835  size_t startFragment = fragmentInfoVec_.size() - 1;
836 
837  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
838  // loop until done inserting all rows
839  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
840  size_t rowsLeftInCurrentFragment =
841  maxFragmentRows_ - currentFragment->shadowNumTuples;
842  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
843  if (rowsLeftInCurrentFragment != 0) {
844  for (auto& varLenColInfoIt : varLenColInfo_) {
845  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
846  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
847  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
848  if (insertIdIt != inverseInsertDataColIdMap.end()) {
849  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
850  numRowsToInsert = std::min(numRowsToInsert,
851  colMapIt->second.getNumElemsForBytesInsertData(
852  dataCopy[insertIdIt->second],
853  numRowsToInsert,
854  numRowsInserted,
855  bytesLeft,
856  insert_data.is_default[insertIdIt->second]));
857  }
858  }
859  }
860 
861  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
862  currentFragment = createNewFragment(defaultInsertLevel_);
863  if (numRowsInserted == 0) {
864  startFragment++;
865  }
866  rowsLeftInCurrentFragment = maxFragmentRows_;
867  for (auto& varLenColInfoIt : varLenColInfo_) {
868  varLenColInfoIt.second = 0; // reset byte counter
869  }
870  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
871  for (auto& varLenColInfoIt : varLenColInfo_) {
872  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
873  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
874  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
875  if (insertIdIt != inverseInsertDataColIdMap.end()) {
876  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
877  numRowsToInsert = std::min(numRowsToInsert,
878  colMapIt->second.getNumElemsForBytesInsertData(
879  dataCopy[insertIdIt->second],
880  numRowsToInsert,
881  numRowsInserted,
882  bytesLeft,
883  insert_data.is_default[insertIdIt->second]));
884  }
885  }
886  }
887 
888  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
889  // never be able to insert anything
890 
891  {
892  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
893  // for each column, append the data in the appropriate insert buffer
894  for (size_t i = 0; i < insert_data.columnIds.size(); ++i) {
895  int columnId = insert_data.columnIds[i];
896  auto colMapIt = columnMap_.find(columnId);
897  CHECK(colMapIt != columnMap_.end());
898  currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
899  dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.is_default[i]);
900  auto varLenColInfoIt = varLenColInfo_.find(columnId);
901  if (varLenColInfoIt != varLenColInfo_.end()) {
902  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
903  }
904  }
905  if (hasMaterializedRowId_) {
906  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
907  currentFragment->shadowNumTuples;
908  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
909  for (size_t i = 0; i < numRowsToInsert; ++i) {
910  row_id_data[i] = i + startId;
911  }
912  DataBlockPtr rowIdBlock;
913  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
914  auto colMapIt = columnMap_.find(rowIdColId_);
915  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
916  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
917  }
918 
919  currentFragment->shadowNumTuples =
920  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
921  numRowsLeft -= numRowsToInsert;
922  numRowsInserted += numRowsToInsert;
923  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
924  partIt != fragmentInfoVec_.end();
925  ++partIt) {
926  auto fragment_ptr = partIt->get();
927  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
928  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
929  }
930  }
931  }
932  numTuples_ += insert_data.numRows;
933  dropFragmentsToSizeNoInsertLock(maxRows_);
934 }
935 
936 FragmentInfo* InsertOrderFragmenter::createNewFragment(
937  const Data_Namespace::MemoryLevel memoryLevel) {
938  // also sets the new fragment as the insertBuffer for each column
939 
940  maxFragmentId_++;
941  auto newFragmentInfo = std::make_unique<FragmentInfo>();
942  newFragmentInfo->fragmentId = maxFragmentId_;
943  newFragmentInfo->shadowNumTuples = 0;
944  newFragmentInfo->setPhysicalNumTuples(0);
945  for (const auto levelSize : dataMgr_->levelSizes_) {
946  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
947  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
948  }
949  newFragmentInfo->physicalTableId = physicalTableId_;
950  newFragmentInfo->shard = shard_;
951 
952  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
953  colMapIt != columnMap_.end();
954  ++colMapIt) {
955  auto& chunk = colMapIt->second;
956  if (memoryLevel == Data_Namespace::MemoryLevel::CPU_LEVEL) {
957  /* At the end of this function chunks from the previous fragment become 'rolled
958  * off', temporaray tables will lose reference to any 'rolled off' chunks and are
959  * not able to unpin these chunks. Keep reference to 'rolled off' chunks and unpin
960  * at ~InsertOrderFragmenter, chunks wrapped by unique_ptr to avoid extraneous
961  * ~Chunk calls with temporary chunks.*/
962  tracked_in_memory_chunks_.emplace_back(std::make_unique<Chunk_NS::Chunk>(chunk));
963  }
964  ChunkKey chunkKey = chunkKeyPrefix_;
965  chunkKey.push_back(chunk.getColumnDesc()->columnId);
966  chunkKey.push_back(maxFragmentId_);
967  chunk.createChunkBuffer(dataMgr_,
968  chunkKey,
969  memoryLevel,
970  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
971  pageSize_);
972  chunk.initEncoder();
973  }
974 
975  heavyai::lock_guard<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
976  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
977  return fragmentInfoVec_.back().get();
978 }
979 
980 size_t InsertOrderFragmenter::getNumFragments() {
981  heavyai::shared_lock<heavyai::shared_mutex> readLock(fragmentInfoMutex_);
982  return fragmentInfoVec_.size();
983 }
984 
985 TableInfo InsertOrderFragmenter::getFragmentsForQuery() {
986  heavyai::shared_lock<heavyai::shared_mutex> readLock(fragmentInfoMutex_);
987  TableInfo queryInfo;
988  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
989  // right now we don't test predicate, so just return (copy of) all fragments
990  bool fragmentsExist = false;
991  if (fragmentInfoVec_.empty()) {
992  // If we have no fragments add a dummy empty fragment to make the executor
993  // not have separate logic for 0-row tables
994  int maxFragmentId = 0;
995  FragmentInfo emptyFragmentInfo;
996  emptyFragmentInfo.fragmentId = maxFragmentId;
997  emptyFragmentInfo.shadowNumTuples = 0;
998  emptyFragmentInfo.setPhysicalNumTuples(0);
999  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
1000  emptyFragmentInfo.physicalTableId = physicalTableId_;
1001  emptyFragmentInfo.shard = shard_;
1002  queryInfo.fragments.push_back(emptyFragmentInfo);
1003  } else {
1004  fragmentsExist = true;
1005  std::for_each(
1006  fragmentInfoVec_.begin(),
1007  fragmentInfoVec_.end(),
1008  [&queryInfo](const auto& fragment_owned_ptr) {
1009  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
1010  });
1011  }
1012  readLock.unlock();
1013  queryInfo.setPhysicalNumTuples(0);
1014  auto partIt = queryInfo.fragments.begin();
1015  if (fragmentsExist) {
1016  while (partIt != queryInfo.fragments.end()) {
1017  if (partIt->getPhysicalNumTuples() == 0) {
1018  // this means that a concurrent insert query inserted tuples into a new fragment
1019  // but when the query came in we didn't have this fragment. To make sure we
1020  // don't mess up the executor we delete this fragment from the metadatamap
1021  // (fixes earlier bug found 2015-05-08)
1022  partIt = queryInfo.fragments.erase(partIt);
1023  } else {
1024  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
1025  partIt->getPhysicalNumTuples());
1026  ++partIt;
1027  }
1028  }
1029  } else {
1030  // We added a dummy fragment and know the table is empty
1031  queryInfo.setPhysicalNumTuples(0);
1032  }
1033  return queryInfo;
1034 }
1035 
1036 void InsertOrderFragmenter::resetSizesFromFragments() {
1038  numTuples_ = 0;
1039  for (const auto& fragment_info : fragmentInfoVec_) {
1040  numTuples_ += fragment_info->getPhysicalNumTuples();
1041  }
1042  setLastFragmentVarLenColumnSizes();
1043 }
1044 
1045 void InsertOrderFragmenter::setLastFragmentVarLenColumnSizes() {
1046  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
1047  // Now need to get the insert buffers for each column - should be last
1048  // fragment
1049  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
1050  // TODO: add accessor here for safe indexing
1051  int deviceId =
1052  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
1053  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
1054  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
1055  insertKey.push_back(colIt->first); // column id
1056  insertKey.push_back(lastFragmentId); // fragment id
1057  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
1058  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
1059  if (varLenColInfoIt != varLenColInfo_.end()) {
1060  varLenColInfoIt->second = colIt->second.getBuffer()->size();
1061  }
1062  }
1063  }
1064 }
1065 } // namespace Fragmenter_Namespace
std::lock_guard< T > lock_guard
int32_t maxRollbackEpochs
bool g_use_table_device_offset
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:458
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
heavyai::shared_lock< heavyai::shared_mutex > read_lock
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
#define LOG(tag)
Definition: Logger.h:285
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:306
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1235
std::vector< bool > is_default
Definition: Fragmenter.h:75
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:170
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::shared_lock< T > shared_lock
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:164
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
std::unique_lock< T > unique_lock
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
std::map< int, std::shared_ptr< Chunk_NS::Chunk > > chunks
Definition: Fragmenter.h:52
std::vector< size_t > valid_row_indices
Definition: Fragmenter.h:53
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:291
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:166
#define DROP_FRAGMENT_FACTOR
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:118
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:73
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
bool is_dict_encoded_string() const
Definition: sqltypes.h:632
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
int8_t * numbersPtr
Definition: sqltypes.h:223
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
ChunkMetadataMap shadowChunkMetadataMap
Definition: Fragmenter.h:133
#define VLOG(n)
Definition: Logger.h:387
size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment, const size_t num_rows_left, const size_t num_rows_inserted, const std::unordered_map< int, size_t > &var_len_col_info, const size_t max_chunk_size, const InsertChunks &insert_chunks, std::map< int, Chunk_NS::Chunk > &column_map, const std::vector< size_t > &valid_row_indices)
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)