OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InsertOrderFragmenter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <algorithm>
20 #include <boost/lexical_cast.hpp>
21 #include <cassert>
22 #include <cmath>
23 #include <iostream>
24 #include <limits>
25 #include <memory>
26 #include <thread>
27 #include <type_traits>
28 
29 #include "DataMgr/AbstractBuffer.h"
31 #include "DataMgr/DataMgr.h"
33 #include "LockMgr/LockMgr.h"
34 #include "Logger/Logger.h"
35 #include "Utils/DdlUtils.h"
36 
37 #include "Shared/checked_alloc.h"
38 #include "Shared/scope.h"
39 #include "Shared/thread_count.h"
40 
41 #define DROP_FRAGMENT_FACTOR \
42  0.97 // drop to 97% of max so we don't keep adding and dropping fragments
43 
44 using Chunk_NS::Chunk;
47 
49 
50 using namespace std;
51 
52 namespace Fragmenter_Namespace {
53 
54 InsertOrderFragmenter::InsertOrderFragmenter(
55  const vector<int> chunkKeyPrefix,
56  vector<Chunk>& chunkVec,
57  Data_Namespace::DataMgr* dataMgr,
59  const int physicalTableId,
60  const int shard,
61  const size_t maxFragmentRows,
62  const size_t maxChunkSize,
63  const size_t pageSize,
64  const size_t maxRows,
65  const Data_Namespace::MemoryLevel defaultInsertLevel,
66  const bool uses_foreign_storage)
67  : chunkKeyPrefix_(chunkKeyPrefix)
68  , dataMgr_(dataMgr)
69  , catalog_(catalog)
70  , physicalTableId_(physicalTableId)
71  , shard_(shard)
72  , maxFragmentRows_(std::min<size_t>(maxFragmentRows, maxRows))
73  , pageSize_(pageSize)
74  , numTuples_(0)
75  , maxFragmentId_(-1)
76  , maxChunkSize_(maxChunkSize)
77  , maxRows_(maxRows)
78  , fragmenterType_("insert_order")
79  , defaultInsertLevel_(defaultInsertLevel)
80  , uses_foreign_storage_(uses_foreign_storage)
81  , hasMaterializedRowId_(false)
82  , mutex_access_inmem_states(new std::mutex) {
83  // Note that Fragmenter is not passed virtual columns and so should only
84  // find row id column if it is non virtual
85 
86  for (auto colIt = chunkVec.begin(); colIt != chunkVec.end(); ++colIt) {
87  int columnId = colIt->getColumnDesc()->columnId;
88  columnMap_[columnId] = *colIt;
89  if (colIt->getColumnDesc()->columnName == "rowid") {
90  hasMaterializedRowId_ = true;
91  rowIdColId_ = columnId;
92  }
93  }
94  conditionallyInstantiateFileMgrWithParams();
95  getChunkMetadata();
96 }
97 
98 InsertOrderFragmenter::~InsertOrderFragmenter() {}
99 
100 namespace {
101 
102 ChunkKey get_chunk_key(const ChunkKey& prefix, int column_id, int fragment_id) {
103  ChunkKey key = prefix; // database_id and table_id
104  key.push_back(column_id);
105  key.push_back(fragment_id); // fragment id
106  return key;
107 }
108 
112 };
113 
115  const ColumnDescriptor* array_cd) {
116  array_chunk.temp_cd = *array_cd;
117  array_chunk.temp_cd.columnType = array_cd->columnType.get_elem_type();
118  array_chunk.chunk = Chunk_NS::Chunk{&array_chunk.temp_cd, true};
119 }
120 
122  public:
123  BaseAlterColumnContext(int device_id,
124  const ChunkKey& chunk_key_prefix,
125  Fragmenter_Namespace::FragmentInfo* fragment_info,
126  const ColumnDescriptor* src_cd,
127  const ColumnDescriptor* dst_cd,
128  const size_t num_elements,
129  Data_Namespace::DataMgr* data_mgr,
131  std::map<int, Chunk_NS::Chunk>& column_map)
132  : device_id_(device_id)
133  , chunk_key_prefix_(chunk_key_prefix)
134  , fragment_info_(fragment_info)
135  , src_cd_(src_cd)
136  , dst_cd_(dst_cd)
137  , num_elements_(num_elements)
138  , data_mgr_(data_mgr)
139  , catalog_(catalog)
140  , column_map_(column_map)
141  , buffer_(nullptr)
142  , index_buffer_(nullptr)
143  , disk_level_src_chunk_{src_cd}
144  , mem_level_src_chunk_{src_cd} {
145  key_ = get_chunk_key(chunk_key_prefix, src_cd->columnId, fragment_info->fragmentId);
146  }
147 
148  static void unpinChunk(Chunk& chunk) {
149  auto buffer = chunk.getBuffer();
150  if (buffer) {
151  buffer->unPin();
152  chunk.setBuffer(nullptr);
153  }
154 
155  auto index_buffer = chunk.getIndexBuf();
156  if (index_buffer) {
157  index_buffer->unPin();
158  chunk.setIndexBuffer(nullptr);
159  }
160  }
161 
162  void readSourceData() {
163  disk_level_src_chunk_.getChunkBuffer(
165  // FIXME: there appears to be a bug where if the `num_elements` is not specified
166  // below, the wrong byte count is returned for index buffers
167  mem_level_src_chunk_.getChunkBuffer(data_mgr_,
168  key_,
170  0,
171  disk_level_src_chunk_.getBuffer()->size(),
172  num_elements_);
173  CHECK_EQ(num_elements_,
174  mem_level_src_chunk_.getBuffer()->getEncoder()->getNumElems());
175 
176  auto db_id = catalog_->getDatabaseId();
177  source = data_conversion::create_source(mem_level_src_chunk_, db_id);
178 
179  try {
180  std::tie(src_data_, std::ignore) = source->getSourceData();
181  } catch (std::exception& except) {
182  src_data_ = nullptr;
183  throw std::runtime_error("Column " + src_cd_->columnName + ": " + except.what());
184  }
185  }
186 
187  protected:
189  chunk.setBuffer(data_mgr_->alloc(MemoryLevel::CPU_LEVEL, 0, 0));
190  if (chunk.getColumnDesc()->columnType.is_varlen_indeed()) {
191  chunk.setIndexBuffer(data_mgr_->alloc(MemoryLevel::CPU_LEVEL, 0, 0));
192  }
193  }
194 
196  data_mgr_->free(chunk.getBuffer());
197  chunk.setBuffer(nullptr);
198  if (chunk.getColumnDesc()->columnType.is_varlen_indeed()) {
199  data_mgr_->free(chunk.getIndexBuf());
200  chunk.setIndexBuffer(nullptr);
201  }
202  }
203 
209  const size_t num_elements_;
212  std::map<int, Chunk_NS::Chunk>& column_map_;
213 
215  std::unique_ptr<data_conversion::BaseSource> source;
221  const int8_t* src_data_;
223 };
224 
226  public:
227  GeoAlterColumnContext(int device_id,
228  const ChunkKey& chunk_key_prefix,
229  Fragmenter_Namespace::FragmentInfo* fragment_info,
230  const ColumnDescriptor* src_cd,
231  const ColumnDescriptor* dst_cd,
232  const size_t num_elements,
233  Data_Namespace::DataMgr* data_mgr,
235  std::map<int, Chunk_NS::Chunk>& column_map,
236  const std::list<const ColumnDescriptor*>& columns)
237  : BaseAlterColumnContext(device_id,
238  chunk_key_prefix,
239  fragment_info,
240  src_cd,
241  dst_cd,
242  num_elements,
243  data_mgr,
244  catalog,
245  column_map)
246  , dst_columns_(columns) {}
247 
249  std::list<Chunk_NS::Chunk>& geo_chunks = param_.geo_chunks;
250  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata = param_.geo_chunk_metadata;
251  // create all geo chunk buffers
252  for (auto dst_cd : dst_columns_) {
253  geo_chunks.emplace_back(dst_cd, true);
254  auto& dst_chunk = geo_chunks.back();
255 
256  createChunkScratchBuffer(dst_chunk);
257  dst_chunk.initEncoder();
258 
259  chunk_metadata.push_back(std::make_unique<ChunkMetadata>());
260  }
261  }
262 
264  for (auto& dst_chunk : param_.geo_chunks) {
265  freeChunkScratchBuffer(dst_chunk);
266  }
267  }
268 
269  void encodeData(const bool geo_validate_geometry) {
270  auto convert_encoder =
271  data_conversion::create_string_view_encoder(param_, false, geo_validate_geometry);
272  try {
273  convert_encoder->encodeAndAppendData(src_data_, num_elements_);
274  convert_encoder->finalize(num_elements_);
275  } catch (std::exception& except) {
276  throw std::runtime_error("Column " + (*dst_columns_.begin())->columnName + ": " +
277  except.what());
278  }
279  }
280 
282  auto metadata_it = param_.geo_chunk_metadata.begin();
283  auto chunk_it = param_.geo_chunks.begin();
284  for (auto dst_cd : dst_columns_) {
285  auto& chunk = *chunk_it;
286  auto& metadata = *metadata_it;
287 
288  auto encoder = chunk.getBuffer()->getEncoder();
289  CHECK(encoder);
290  encoder->resetChunkStats(metadata->chunkStats);
291  encoder->setNumElems(num_elements_);
292 
293  ChunkKey dst_key =
294  get_chunk_key(chunk_key_prefix_, dst_cd->columnId, fragment_info_->fragmentId);
295 
296  if (dst_cd->columnType.is_varlen_indeed()) {
297  auto data_key = dst_key;
298  data_key.push_back(1);
299  auto index_key = dst_key;
300  index_key.push_back(2);
301 
302  chunk.getBuffer()->setUpdated();
303  chunk.getIndexBuf()->setUpdated();
304 
305  Chunk fragmenter_chunk{dst_cd, false};
306  fragmenter_chunk.setBuffer(
307  data_mgr_->getGlobalFileMgr()->putBuffer(data_key, chunk.getBuffer()));
308  fragmenter_chunk.setIndexBuffer(
309  data_mgr_->getGlobalFileMgr()->putBuffer(index_key, chunk.getIndexBuf()));
310  column_map_[src_cd_->columnId] = fragmenter_chunk;
311 
312  } else {
313  chunk.getBuffer()->setUpdated();
314 
315  Chunk fragmenter_chunk{dst_cd, false};
316  fragmenter_chunk.setBuffer(
317  data_mgr_->getGlobalFileMgr()->putBuffer(dst_key, chunk.getBuffer()));
318  column_map_[src_cd_->columnId] = fragmenter_chunk;
319  }
320 
321  chunk_it++;
322  metadata_it++;
323  }
324  }
325 
326  private:
327  const std::list<const ColumnDescriptor*>& dst_columns_;
328 };
329 
331  public:
333  const ChunkKey& chunk_key_prefix,
334  Fragmenter_Namespace::FragmentInfo* fragment_info,
335  const ColumnDescriptor* src_cd,
336  const ColumnDescriptor* dst_cd,
337  const size_t num_elements,
338  Data_Namespace::DataMgr* data_mgr,
339  Catalog_Namespace::Catalog* catalog_,
340  std::map<int, Chunk_NS::Chunk>& column_map)
341  : BaseAlterColumnContext(device_id,
342  chunk_key_prefix,
343  fragment_info,
344  src_cd,
345  dst_cd,
346  num_elements,
347  data_mgr,
348  catalog_,
349  column_map) {}
350 
352  auto db_id = catalog_->getDatabaseId();
353  param_.db_id = db_id;
354  param_.dst_chunk = Chunk_NS::Chunk{dst_cd_, true};
355  if (dst_cd_->columnType.is_array()) {
356  create_array_elem_type_chunk(scalar_temp_chunk_, dst_cd_);
357  param_.scalar_temp_chunk = scalar_temp_chunk_.chunk;
358  }
359 
360  auto& dst_chunk = param_.dst_chunk;
361 
362  createChunkScratchBuffer(dst_chunk);
363 
364  if (dst_cd_->columnType.is_array()) {
365  createChunkScratchBuffer(param_.scalar_temp_chunk);
366  }
367 
368  buffer_ = dst_chunk.getBuffer();
369  index_buffer_ = dst_chunk.getIndexBuf(); // nullptr for non-varlen types
370  }
371 
373  freeChunkScratchBuffer(param_.dst_chunk);
374  if (dst_cd_->columnType.is_array()) {
375  freeChunkScratchBuffer(param_.scalar_temp_chunk);
376  }
377  }
378 
379  void reencodeData() {
380  auto& dst_chunk = param_.dst_chunk;
381  disk_level_src_chunk_.getBuffer()->syncEncoder(dst_chunk.getBuffer());
382  if (disk_level_src_chunk_.getIndexBuf() && dst_chunk.getIndexBuf()) {
383  disk_level_src_chunk_.getIndexBuf()->syncEncoder(dst_chunk.getIndexBuf());
384  }
385 
386  dst_chunk.initEncoder();
387 
388  auto convert_encoder =
389  data_conversion::create_string_view_encoder(param_, false, false); // not geo
390 
391  try {
392  convert_encoder->encodeAndAppendData(src_data_, num_elements_);
393  convert_encoder->finalize(num_elements_);
394  } catch (std::exception& except) {
395  throw std::runtime_error("Column " + src_cd_->columnName + ": " + except.what());
396  }
397 
398  auto metadata = convert_encoder->getMetadata(
399  dst_cd_->columnType.is_array() ? param_.scalar_temp_chunk : dst_chunk);
400 
401  buffer_->getEncoder()->resetChunkStats(metadata->chunkStats);
402  buffer_->getEncoder()->setNumElems(num_elements_);
403 
404  buffer_->setUpdated();
405  if (index_buffer_) {
406  index_buffer_->setUpdated();
407  }
408  }
409 
411  if (dst_cd_->columnType.is_varlen_indeed()) {
412  auto data_key = key_;
413  data_key.push_back(1);
414  auto index_key = key_;
415  index_key.push_back(2);
416 
417  Chunk fragmenter_chunk{dst_cd_, false};
418  fragmenter_chunk.setBuffer(
419  data_mgr_->getGlobalFileMgr()->putBuffer(data_key, buffer_));
420  fragmenter_chunk.setIndexBuffer(
421  data_mgr_->getGlobalFileMgr()->putBuffer(index_key, index_buffer_));
422  column_map_[src_cd_->columnId] = fragmenter_chunk;
423 
424  } else {
425  Chunk fragmenter_chunk{dst_cd_, false};
426  fragmenter_chunk.setBuffer(data_mgr_->getGlobalFileMgr()->putBuffer(key_, buffer_));
427  column_map_[src_cd_->columnId] = fragmenter_chunk;
428  }
429  }
430 };
431 
436 int compute_device_for_fragment(const int table_id,
437  const int fragment_id,
438  const int num_devices) {
440  return (table_id + fragment_id) % num_devices;
441  } else {
442  return fragment_id % num_devices;
443  }
444 }
445 
446 size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment,
447  const size_t num_rows_left,
448  const size_t num_rows_inserted,
449  const std::unordered_map<int, size_t>& var_len_col_info,
450  const size_t max_chunk_size,
451  const InsertChunks& insert_chunks,
452  std::map<int, Chunk_NS::Chunk>& column_map,
453  const std::vector<size_t>& valid_row_indices) {
454  size_t num_rows_to_insert = min(rows_left_in_current_fragment, num_rows_left);
455  if (rows_left_in_current_fragment != 0) {
456  for (const auto& var_len_col_info_it : var_len_col_info) {
457  CHECK_LE(var_len_col_info_it.second, max_chunk_size);
458  size_t bytes_left = max_chunk_size - var_len_col_info_it.second;
459  auto find_it = insert_chunks.chunks.find(var_len_col_info_it.first);
460  if (find_it == insert_chunks.chunks.end()) {
461  continue;
462  }
463  const auto& chunk = find_it->second;
464  auto column_type = chunk->getColumnDesc()->columnType;
465  const int8_t* index_buffer_ptr =
466  column_type.is_varlen_indeed() ? chunk->getIndexBuf()->getMemoryPtr() : nullptr;
467  CHECK(column_type.is_varlen());
468 
469  auto col_map_it = column_map.find(var_len_col_info_it.first);
470  num_rows_to_insert =
471  std::min(num_rows_to_insert,
472  col_map_it->second.getNumElemsForBytesEncodedDataAtIndices(
473  index_buffer_ptr, valid_row_indices, bytes_left));
474  }
475  }
476  return num_rows_to_insert;
477 }
478 
479 } // namespace
480 
481 void InsertOrderFragmenter::conditionallyInstantiateFileMgrWithParams() {
482  // Somewhat awkward to do this in Fragmenter, but FileMgrs are not instantiated until
483  // first use by Fragmenter, and until maxRollbackEpochs param, no options were set in
484  // storage per table
485  if (!uses_foreign_storage_ &&
486  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
487  const TableDescriptor* td =
488  catalog_->getMetadataForTable(physicalTableId_, false /*populateFragmenter*/);
489  File_Namespace::FileMgrParams fileMgrParams;
490  fileMgrParams.max_rollback_epochs = td->maxRollbackEpochs;
491  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
492  chunkKeyPrefix_[0], chunkKeyPrefix_[1], fileMgrParams);
493  }
494 }
495 
496 void InsertOrderFragmenter::getChunkMetadata() {
497  if (uses_foreign_storage_ ||
498  defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL) {
499  // memory-resident tables won't have anything on disk
500  ChunkMetadataVector chunk_metadata;
501  dataMgr_->getChunkMetadataVecForKeyPrefix(chunk_metadata, chunkKeyPrefix_);
502 
503  // data comes like this - database_id, table_id, column_id, fragment_id
504  // but lets sort by database_id, table_id, fragment_id, column_id
505 
506  int fragment_subkey_index = 3;
507  std::sort(chunk_metadata.begin(),
508  chunk_metadata.end(),
509  [&](const auto& pair1, const auto& pair2) {
510  return pair1.first[3] < pair2.first[3];
511  });
512 
513  for (auto chunk_itr = chunk_metadata.begin(); chunk_itr != chunk_metadata.end();
514  ++chunk_itr) {
515  int cur_column_id = chunk_itr->first[2];
516  int cur_fragment_id = chunk_itr->first[fragment_subkey_index];
517 
518  if (fragmentInfoVec_.empty() ||
519  cur_fragment_id != fragmentInfoVec_.back()->fragmentId) {
520  auto new_fragment_info = std::make_unique<Fragmenter_Namespace::FragmentInfo>();
521  CHECK(new_fragment_info);
522  maxFragmentId_ = cur_fragment_id;
523  new_fragment_info->fragmentId = cur_fragment_id;
524  new_fragment_info->setPhysicalNumTuples(chunk_itr->second->numElements);
525  numTuples_ += new_fragment_info->getPhysicalNumTuples();
526  for (const auto level_size : dataMgr_->levelSizes_) {
527  new_fragment_info->deviceIds.push_back(
528  compute_device_for_fragment(physicalTableId_, cur_fragment_id, level_size));
529  }
530  new_fragment_info->shadowNumTuples = new_fragment_info->getPhysicalNumTuples();
531  new_fragment_info->physicalTableId = physicalTableId_;
532  new_fragment_info->shard = shard_;
533  fragmentInfoVec_.emplace_back(std::move(new_fragment_info));
534  } else {
535  if (chunk_itr->second->numElements !=
536  fragmentInfoVec_.back()->getPhysicalNumTuples()) {
537  LOG(FATAL) << "Inconsistency in num tuples within fragment for table " +
538  std::to_string(physicalTableId_) + ", Column " +
539  std::to_string(cur_column_id) + ". Fragment Tuples: " +
541  fragmentInfoVec_.back()->getPhysicalNumTuples()) +
542  ", Chunk Tuples: " +
543  std::to_string(chunk_itr->second->numElements);
544  }
545  }
546  CHECK(fragmentInfoVec_.back().get());
547  fragmentInfoVec_.back().get()->setChunkMetadata(cur_column_id, chunk_itr->second);
548  }
549  }
550 
551  size_t maxFixedColSize = 0;
552 
553  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
554  auto size = colIt->second.getColumnDesc()->columnType.get_size();
555  if (size == -1) { // variable length
556  varLenColInfo_.insert(std::make_pair(colIt->first, 0));
557  size = 8; // b/c we use this for string and array indices - gross to have magic
558  // number here
559  }
560  CHECK_GE(size, 0);
561  maxFixedColSize = std::max(maxFixedColSize, static_cast<size_t>(size));
562  }
563 
564  // this is maximum number of rows assuming everything is fixed length
565  maxFragmentRows_ = std::min(maxFragmentRows_, maxChunkSize_ / maxFixedColSize);
566  setLastFragmentVarLenColumnSizes();
567 }
568 
569 void InsertOrderFragmenter::dropFragmentsToSize(const size_t max_rows) {
570  heavyai::unique_lock<heavyai::shared_mutex> insert_lock(insertMutex_);
571  dropFragmentsToSizeNoInsertLock(max_rows);
572 }
573 
574 void InsertOrderFragmenter::dropFragmentsToSizeNoInsertLock(const size_t max_rows) {
575  // not safe to call from outside insertData
576  // b/c depends on insertLock around numTuples_
577 
578  // don't ever drop the only fragment!
579  if (fragmentInfoVec_.empty() ||
580  numTuples_ == fragmentInfoVec_.back()->getPhysicalNumTuples()) {
581  return;
582  }
583 
584  if (numTuples_ > max_rows) {
585  size_t preNumTuples = numTuples_;
586  vector<int> dropFragIds;
587  size_t targetRows = max_rows * DROP_FRAGMENT_FACTOR;
588  while (numTuples_ > targetRows) {
589  CHECK_GT(fragmentInfoVec_.size(), size_t(0));
590  size_t numFragTuples = fragmentInfoVec_[0]->getPhysicalNumTuples();
591  dropFragIds.push_back(fragmentInfoVec_[0]->fragmentId);
592  fragmentInfoVec_.pop_front();
593  CHECK_GE(numTuples_, numFragTuples);
594  numTuples_ -= numFragTuples;
595  }
596  deleteFragments(dropFragIds);
597  LOG(INFO) << "dropFragmentsToSize, numTuples pre: " << preNumTuples
598  << " post: " << numTuples_ << " maxRows: " << max_rows;
599  }
600 }
601 
602 void InsertOrderFragmenter::deleteFragments(const vector<int>& dropFragIds) {
603  // Fix a verified loophole on sharded logical table which is locked using logical
604  // tableId while it's its physical tables that can come here when fragments overflow
605  // during COPY. Locks on a logical table and its physical tables never intersect, which
606  // means potential races. It'll be an overkill to resolve a logical table to physical
607  // tables in DBHandler, ParseNode or other higher layers where the logical table is
608  // locked with Table Read/Write locks; it's easier to lock the logical table of its
609  // physical tables. A downside of this approach may be loss of parallel execution of
610  // deleteFragments across physical tables. Because deleteFragments is a short in-memory
611  // operation, the loss seems not a big deal.
612  auto chunkKeyPrefix = chunkKeyPrefix_;
613  if (shard_ >= 0) {
614  chunkKeyPrefix[1] = catalog_->getLogicalTableId(chunkKeyPrefix[1]);
615  }
616 
617  // need to keep lock seq as TableLock >> fragmentInfoMutex_ or
618  // SELECT and COPY may enter a deadlock
619  const auto delete_lock =
621 
622  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
623 
624  for (const auto fragId : dropFragIds) {
625  for (const auto& col : columnMap_) {
626  int colId = col.first;
627  vector<int> fragPrefix = chunkKeyPrefix_;
628  fragPrefix.push_back(colId);
629  fragPrefix.push_back(fragId);
630  dataMgr_->deleteChunksWithPrefix(fragPrefix);
631  }
632  }
633 }
634 
635 void InsertOrderFragmenter::updateColumnChunkMetadata(
636  const ColumnDescriptor* cd,
637  const int fragment_id,
638  const std::shared_ptr<ChunkMetadata> metadata) {
639  // synchronize concurrent accesses to fragmentInfoVec_
640  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
641 
642  CHECK(metadata.get());
643  auto fragment_info = getFragmentInfo(fragment_id);
644  CHECK(fragment_info);
645  fragment_info->setChunkMetadata(cd->columnId, metadata);
646 }
647 
648 void InsertOrderFragmenter::updateChunkStats(
649  const ColumnDescriptor* cd,
650  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map,
651  std::optional<Data_Namespace::MemoryLevel> memory_level) {
652  // synchronize concurrent accesses to fragmentInfoVec_
653  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
659  if (shard_ >= 0) {
660  LOG(WARNING) << "Skipping chunk stats update for logical table " << physicalTableId_;
661  }
662 
663  CHECK(cd);
664  const auto column_id = cd->columnId;
665  const auto col_itr = columnMap_.find(column_id);
666  CHECK(col_itr != columnMap_.end());
667 
668  for (auto const& fragment : fragmentInfoVec_) {
669  auto stats_itr = stats_map.find(fragment->fragmentId);
670  if (stats_itr != stats_map.end()) {
671  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(column_id);
672  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
673  ChunkKey chunk_key{catalog_->getCurrentDB().dbId,
674  physicalTableId_,
675  column_id,
676  fragment->fragmentId};
677  auto chunk = Chunk_NS::Chunk::getChunk(cd,
678  &catalog_->getDataMgr(),
679  chunk_key,
680  memory_level.value_or(defaultInsertLevel_),
681  0,
682  chunk_meta_it->second->numBytes,
683  chunk_meta_it->second->numElements);
684  auto buf = chunk->getBuffer();
685  CHECK(buf);
686  if (!buf->hasEncoder()) {
687  throw std::runtime_error("No encoder for chunk " + show_chunk(chunk_key));
688  }
689  auto encoder = buf->getEncoder();
690 
691  auto chunk_stats = stats_itr->second;
692 
693  auto old_chunk_metadata = std::make_shared<ChunkMetadata>();
694  encoder->getMetadata(old_chunk_metadata);
695  auto& old_chunk_stats = old_chunk_metadata->chunkStats;
696 
697  const bool didResetStats = encoder->resetChunkStats(chunk_stats);
698  // Use the logical type to display data, since the encoding should be ignored
699  const auto logical_ti = cd->columnType.is_dict_encoded_string()
702  if (!didResetStats) {
703  VLOG(3) << "Skipping chunk stats reset for " << show_chunk(chunk_key);
704  VLOG(3) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
705  << DatumToString(chunk_stats.max, logical_ti);
706  VLOG(3) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
707  << DatumToString(chunk_stats.min, logical_ti);
708  VLOG(3) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
709  continue; // move to next fragment
710  }
711 
712  VLOG(2) << "Resetting chunk stats for " << show_chunk(chunk_key);
713  VLOG(2) << "Max: " << DatumToString(old_chunk_stats.max, logical_ti) << " -> "
714  << DatumToString(chunk_stats.max, logical_ti);
715  VLOG(2) << "Min: " << DatumToString(old_chunk_stats.min, logical_ti) << " -> "
716  << DatumToString(chunk_stats.min, logical_ti);
717  VLOG(2) << "Nulls: " << (chunk_stats.has_nulls ? "True" : "False");
718 
719  // Reset fragment metadata map and set buffer to dirty
720  auto new_metadata = std::make_shared<ChunkMetadata>();
721  // Run through fillChunkStats to ensure any transformations to the raw metadata
722  // values get applied (e.g. for date in days)
723  encoder->getMetadata(new_metadata);
724 
725  fragment->setChunkMetadata(column_id, new_metadata);
726  fragment->shadowChunkMetadataMap =
727  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
728  if (defaultInsertLevel_ == Data_Namespace::DISK_LEVEL) {
729  buf->setDirty();
730  }
731  } else {
732  LOG(WARNING) << "No chunk stats update found for fragment " << fragment->fragmentId
733  << ", table " << physicalTableId_ << ", "
734  << ", column " << column_id;
735  }
736  }
737 }
738 
739 FragmentInfo* InsertOrderFragmenter::getFragmentInfo(const int fragment_id) const {
740  auto fragment_it = std::find_if(fragmentInfoVec_.begin(),
741  fragmentInfoVec_.end(),
742  [fragment_id](const auto& fragment) -> bool {
743  return fragment->fragmentId == fragment_id;
744  });
745  CHECK(fragment_it != fragmentInfoVec_.end());
746  return fragment_it->get();
747 }
748 
749 bool InsertOrderFragmenter::isAddingNewColumns(const InsertData& insert_data) const {
750  bool all_columns_already_exist = true, all_columns_are_new = true;
751  for (const auto column_id : insert_data.columnIds) {
752  if (columnMap_.find(column_id) == columnMap_.end()) {
753  all_columns_already_exist = false;
754  } else {
755  all_columns_are_new = false;
756  }
757  }
758  // only one should be TRUE
759  bool either_all_exist_or_all_new = all_columns_already_exist ^ all_columns_are_new;
760  CHECK(either_all_exist_or_all_new);
761  return all_columns_are_new;
762 }
763 
764 void InsertOrderFragmenter::insertChunks(const InsertChunks& insert_chunk) {
765  try {
766  // prevent two threads from trying to insert into the same table simultaneously
767  heavyai::unique_lock<heavyai::shared_mutex> insertLock(insertMutex_);
768  insertChunksImpl(insert_chunk);
769  if (defaultInsertLevel_ ==
770  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
771  dataMgr_->checkpoint(
772  chunkKeyPrefix_[0],
773  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
774  }
775  } catch (...) {
776  auto db_id = insert_chunk.db_id;
777  auto table_epochs = catalog_->getTableEpochs(db_id, insert_chunk.table_id);
778  // the statement below deletes *this* object!
779  // relying on exception propagation at this stage
780  // until we can sort this out in a cleaner fashion
781  catalog_->setTableEpochs(db_id, table_epochs);
782  throw;
783  }
784 }
785 
786 void InsertOrderFragmenter::insertData(InsertData& insert_data_struct) {
787  try {
788  // prevent two threads from trying to insert into the same table simultaneously
789  heavyai::unique_lock<heavyai::shared_mutex> insertLock(insertMutex_);
790  if (!isAddingNewColumns(insert_data_struct)) {
791  insertDataImpl(insert_data_struct);
792  } else {
793  addColumns(insert_data_struct);
794  }
795  if (defaultInsertLevel_ ==
796  Data_Namespace::DISK_LEVEL) { // only checkpoint if data is resident on disk
797  dataMgr_->checkpoint(
798  chunkKeyPrefix_[0],
799  chunkKeyPrefix_[1]); // need to checkpoint here to remove window for corruption
800  }
801  } catch (...) {
802  auto table_epochs = catalog_->getTableEpochs(insert_data_struct.databaseId,
803  insert_data_struct.tableId);
804  // the statement below deletes *this* object!
805  // relying on exception propagation at this stage
806  // until we can sort this out in a cleaner fashion
807  catalog_->setTableEpochs(insert_data_struct.databaseId, table_epochs);
808  throw;
809  }
810 }
811 
812 void InsertOrderFragmenter::insertChunksNoCheckpoint(const InsertChunks& insert_chunk) {
813  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
815  insertMutex_); // prevent two threads from trying to insert into the same table
816  // simultaneously
817  insertChunksImpl(insert_chunk);
818 }
819 
820 void InsertOrderFragmenter::insertDataNoCheckpoint(InsertData& insert_data_struct) {
821  // TODO: this local lock will need to be centralized when ALTER COLUMN is added, bc
823  insertMutex_); // prevent two threads from trying to insert into the same table
824  // simultaneously
825  if (!isAddingNewColumns(insert_data_struct)) {
826  insertDataImpl(insert_data_struct);
827  } else {
828  addColumns(insert_data_struct);
829  }
830 }
831 
832 void InsertOrderFragmenter::addColumns(const InsertData& insertDataStruct) {
833  // synchronize concurrent accesses to fragmentInfoVec_
834  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
835  size_t numRowsLeft = insertDataStruct.numRows;
836  for (const auto columnId : insertDataStruct.columnIds) {
837  CHECK(columnMap_.end() == columnMap_.find(columnId));
838  const auto columnDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
839  CHECK(columnDesc);
840  columnMap_.emplace(columnId, Chunk_NS::Chunk(columnDesc));
841  }
842  try {
843  for (auto const& fragmentInfo : fragmentInfoVec_) {
844  fragmentInfo->shadowChunkMetadataMap =
845  fragmentInfo->getChunkMetadataMapPhysicalCopy();
846  auto numRowsToInsert = fragmentInfo->getPhysicalNumTuples(); // not getNumTuples()
847  size_t numRowsCanBeInserted;
848  for (size_t i = 0; i < insertDataStruct.columnIds.size(); i++) {
849  auto columnId = insertDataStruct.columnIds[i];
850  auto colDesc = catalog_->getMetadataForColumn(physicalTableId_, columnId);
851  CHECK(colDesc);
852  CHECK(columnMap_.find(columnId) != columnMap_.end());
853 
854  ChunkKey chunkKey = chunkKeyPrefix_;
855  chunkKey.push_back(columnId);
856  chunkKey.push_back(fragmentInfo->fragmentId);
857 
858  auto colMapIt = columnMap_.find(columnId);
859  auto& chunk = colMapIt->second;
860  if (chunk.isChunkOnDevice(
861  dataMgr_,
862  chunkKey,
863  defaultInsertLevel_,
864  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)])) {
865  dataMgr_->deleteChunksWithPrefix(chunkKey);
866  }
867  chunk.createChunkBuffer(
868  dataMgr_,
869  chunkKey,
870  defaultInsertLevel_,
871  fragmentInfo->deviceIds[static_cast<int>(defaultInsertLevel_)]);
872  chunk.initEncoder();
873 
874  try {
875  DataBlockPtr dataCopy = insertDataStruct.data[i];
876  auto size = colDesc->columnType.get_size();
877  if (0 > size) {
878  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
879  varLenColInfo_[columnId] = 0;
880  numRowsCanBeInserted = chunk.getNumElemsForBytesInsertData(
881  dataCopy, numRowsToInsert, 0, maxChunkSize_, true);
882  } else {
883  numRowsCanBeInserted = maxChunkSize_ / size;
884  }
885 
886  // FIXME: abort a case in which new column is wider than existing columns
887  if (numRowsCanBeInserted < numRowsToInsert) {
888  throw std::runtime_error("new column '" + colDesc->columnName +
889  "' wider than existing columns is not supported");
890  }
891 
892  auto chunkMetadata = chunk.appendData(dataCopy, numRowsToInsert, 0, true);
893  fragmentInfo->shadowChunkMetadataMap[columnId] = chunkMetadata;
894 
895  // update total size of var-len column in (actually the last) fragment
896  if (0 > size) {
897  std::unique_lock<std::mutex> lck(*mutex_access_inmem_states);
898  varLenColInfo_[columnId] = chunk.getBuffer()->size();
899  }
900  } catch (...) {
901  dataMgr_->deleteChunksWithPrefix(chunkKey);
902  throw;
903  }
904  }
905  numRowsLeft -= numRowsToInsert;
906  }
907  CHECK(0 == numRowsLeft);
908  } catch (const std::exception& e) {
909  for (const auto columnId : insertDataStruct.columnIds) {
910  columnMap_.erase(columnId);
911  }
912  throw e;
913  }
914 
915  for (auto const& fragmentInfo : fragmentInfoVec_) {
916  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
917  }
918 }
919 
920 void InsertOrderFragmenter::dropColumns(const std::vector<int>& columnIds) {
921  // prevent concurrent insert rows and drop column
922  heavyai::unique_lock<heavyai::shared_mutex> insertLock(insertMutex_);
923  // synchronize concurrent accesses to fragmentInfoVec_
924  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
925  for (auto const& fragmentInfo : fragmentInfoVec_) {
926  fragmentInfo->shadowChunkMetadataMap =
927  fragmentInfo->getChunkMetadataMapPhysicalCopy();
928  }
929 
930  for (const auto columnId : columnIds) {
931  auto cit = columnMap_.find(columnId);
932  if (columnMap_.end() != cit) {
933  columnMap_.erase(cit);
934  }
935 
936  vector<int> fragPrefix = chunkKeyPrefix_;
937  fragPrefix.push_back(columnId);
938  dataMgr_->deleteChunksWithPrefix(fragPrefix);
939 
940  for (const auto& fragmentInfo : fragmentInfoVec_) {
941  auto cmdit = fragmentInfo->shadowChunkMetadataMap.find(columnId);
942  if (fragmentInfo->shadowChunkMetadataMap.end() != cmdit) {
943  fragmentInfo->shadowChunkMetadataMap.erase(cmdit);
944  }
945  }
946  }
947  for (const auto& fragmentInfo : fragmentInfoVec_) {
948  fragmentInfo->setChunkMetadataMap(fragmentInfo->shadowChunkMetadataMap);
949  }
950 }
951 
952 bool InsertOrderFragmenter::hasDeletedRows(const int delete_column_id) {
953  heavyai::shared_lock<heavyai::shared_mutex> read_lock(fragmentInfoMutex_);
954 
955  for (auto const& fragment : fragmentInfoVec_) {
956  auto chunk_meta_it = fragment->getChunkMetadataMapPhysical().find(delete_column_id);
957  CHECK(chunk_meta_it != fragment->getChunkMetadataMapPhysical().end());
958  const auto& chunk_stats = chunk_meta_it->second->chunkStats;
959  if (chunk_stats.max.tinyintval == 1) {
960  return true;
961  }
962  }
963  return false;
964 }
965 
966 void InsertOrderFragmenter::insertChunksIntoFragment(
967  const InsertChunks& insert_chunks,
968  const std::optional<int> delete_column_id,
969  FragmentInfo* current_fragment,
970  const size_t num_rows_to_insert,
971  size_t& num_rows_inserted,
972  size_t& num_rows_left,
973  std::vector<size_t>& valid_row_indices,
974  const size_t start_fragment) {
975  heavyai::unique_lock<heavyai::shared_mutex> write_lock(fragmentInfoMutex_);
976  // for each column, append the data in the appropriate insert buffer
977  auto insert_row_indices = valid_row_indices;
978  CHECK_GE(insert_row_indices.size(), num_rows_to_insert);
979  insert_row_indices.erase(insert_row_indices.begin() + num_rows_to_insert,
980  insert_row_indices.end());
981  CHECK_EQ(insert_row_indices.size(), num_rows_to_insert);
982  for (auto& [column_id, chunk] : insert_chunks.chunks) {
983  auto col_map_it = columnMap_.find(column_id);
984  CHECK(col_map_it != columnMap_.end());
985  current_fragment->shadowChunkMetadataMap[column_id] =
986  col_map_it->second.appendEncodedDataAtIndices(*chunk, insert_row_indices);
987  auto var_len_col_info_it = varLenColInfo_.find(column_id);
988  if (var_len_col_info_it != varLenColInfo_.end()) {
989  var_len_col_info_it->second = col_map_it->second.getBuffer()->size();
990  CHECK_LE(var_len_col_info_it->second, maxChunkSize_);
991  }
992  }
993  if (hasMaterializedRowId_) {
994  size_t start_id = maxFragmentRows_ * current_fragment->fragmentId +
995  current_fragment->shadowNumTuples;
996  std::vector<int64_t> row_id_data(num_rows_to_insert);
997  for (size_t i = 0; i < num_rows_to_insert; ++i) {
998  row_id_data[i] = i + start_id;
999  }
1000  DataBlockPtr row_id_block;
1001  row_id_block.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.data());
1002  auto col_map_it = columnMap_.find(rowIdColId_);
1003  CHECK(col_map_it != columnMap_.end());
1004  current_fragment->shadowChunkMetadataMap[rowIdColId_] = col_map_it->second.appendData(
1005  row_id_block, num_rows_to_insert, num_rows_inserted);
1006  }
1007 
1008  if (delete_column_id) { // has delete column
1009  std::vector<int8_t> delete_data(num_rows_to_insert, false);
1010  DataBlockPtr delete_block;
1011  delete_block.numbersPtr = reinterpret_cast<int8_t*>(delete_data.data());
1012  auto col_map_it = columnMap_.find(*delete_column_id);
1013  CHECK(col_map_it != columnMap_.end());
1014  current_fragment->shadowChunkMetadataMap[*delete_column_id] =
1015  col_map_it->second.appendData(
1016  delete_block, num_rows_to_insert, num_rows_inserted);
1017  }
1018 
1019  current_fragment->shadowNumTuples =
1020  fragmentInfoVec_.back()->getPhysicalNumTuples() + num_rows_to_insert;
1021  num_rows_left -= num_rows_to_insert;
1022  num_rows_inserted += num_rows_to_insert;
1023  for (auto part_it = fragmentInfoVec_.begin() + start_fragment;
1024  part_it != fragmentInfoVec_.end();
1025  ++part_it) {
1026  auto fragment_ptr = part_it->get();
1027  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
1028  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
1029  }
1030 
1031  // truncate the first `num_rows_to_insert` rows in `valid_row_indices`
1032  valid_row_indices.erase(valid_row_indices.begin(),
1033  valid_row_indices.begin() + num_rows_to_insert);
1034 }
1035 
1036 void InsertOrderFragmenter::insertChunksImpl(const InsertChunks& insert_chunks) {
1037  std::optional<int> delete_column_id{std::nullopt};
1038  for (const auto& cit : columnMap_) {
1039  if (cit.second.getColumnDesc()->isDeletedCol) {
1040  delete_column_id = cit.second.getColumnDesc()->columnId;
1041  }
1042  }
1043 
1044  // verify that all chunks to be inserted have same number of rows, otherwise the input
1045  // data is malformed
1046  std::optional<size_t> num_rows{std::nullopt};
1047  for (const auto& [column_id, chunk] : insert_chunks.chunks) {
1048  auto buffer = chunk->getBuffer();
1049  CHECK(buffer);
1050  CHECK(buffer->hasEncoder());
1051  if (!num_rows.has_value()) {
1052  num_rows = buffer->getEncoder()->getNumElems();
1053  } else {
1054  CHECK_EQ(num_rows.value(), buffer->getEncoder()->getNumElems());
1055  }
1056  }
1057 
1058  auto valid_row_indices = insert_chunks.valid_row_indices;
1059  size_t num_rows_left = valid_row_indices.size();
1060  size_t num_rows_inserted = 0;
1061 
1062  if (num_rows_left == 0) {
1063  return;
1064  }
1065 
1066  FragmentInfo* current_fragment{nullptr};
1067 
1068  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
1069  // feels fragile
1070  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
1071  current_fragment = createNewFragment(defaultInsertLevel_);
1072  } else {
1073  current_fragment = fragmentInfoVec_.back().get();
1074  }
1075  CHECK(current_fragment);
1076 
1077  size_t start_fragment = fragmentInfoVec_.size() - 1;
1078 
1079  while (num_rows_left > 0) { // may have to create multiple fragments for bulk insert
1080  // loop until done inserting all rows
1081  CHECK_LE(current_fragment->shadowNumTuples, maxFragmentRows_);
1082  size_t rows_left_in_current_fragment =
1083  maxFragmentRows_ - current_fragment->shadowNumTuples;
1084  size_t num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
1085  num_rows_left,
1086  num_rows_inserted,
1087  varLenColInfo_,
1088  maxChunkSize_,
1089  insert_chunks,
1090  columnMap_,
1091  valid_row_indices);
1092 
1093  if (rows_left_in_current_fragment == 0 || num_rows_to_insert == 0) {
1094  current_fragment = createNewFragment(defaultInsertLevel_);
1095  if (num_rows_inserted == 0) {
1096  start_fragment++;
1097  }
1098  rows_left_in_current_fragment = maxFragmentRows_;
1099  for (auto& varLenColInfoIt : varLenColInfo_) {
1100  varLenColInfoIt.second = 0; // reset byte counter
1101  }
1102  num_rows_to_insert = get_num_rows_to_insert(rows_left_in_current_fragment,
1103  num_rows_left,
1104  num_rows_inserted,
1105  varLenColInfo_,
1106  maxChunkSize_,
1107  insert_chunks,
1108  columnMap_,
1109  valid_row_indices);
1110  }
1111 
1112  CHECK_GT(num_rows_to_insert, size_t(0)); // would put us into an endless loop as we'd
1113  // never be able to insert anything
1114 
1115  insertChunksIntoFragment(insert_chunks,
1116  delete_column_id,
1117  current_fragment,
1118  num_rows_to_insert,
1119  num_rows_inserted,
1120  num_rows_left,
1121  valid_row_indices,
1122  start_fragment);
1123  }
1124  numTuples_ += *num_rows;
1125  dropFragmentsToSizeNoInsertLock(maxRows_);
1126 }
1127 
1128 void InsertOrderFragmenter::insertDataImpl(InsertData& insert_data) {
1129  // populate deleted system column if it should exist, as it will not come from client
1130  std::unique_ptr<int8_t[]> data_for_deleted_column;
1131  for (const auto& cit : columnMap_) {
1132  if (cit.second.getColumnDesc()->isDeletedCol) {
1133  data_for_deleted_column.reset(new int8_t[insert_data.numRows]);
1134  memset(data_for_deleted_column.get(), 0, insert_data.numRows);
1135  insert_data.data.emplace_back(DataBlockPtr{data_for_deleted_column.get()});
1136  insert_data.columnIds.push_back(cit.second.getColumnDesc()->columnId);
1137  insert_data.is_default.push_back(false);
1138  break;
1139  }
1140  }
1141  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
1142  std::unordered_map<int, int> inverseInsertDataColIdMap;
1143  for (size_t insertId = 0; insertId < insert_data.columnIds.size(); ++insertId) {
1144  inverseInsertDataColIdMap.insert(
1145  std::make_pair(insert_data.columnIds[insertId], insertId));
1146  }
1147 
1148  size_t numRowsLeft = insert_data.numRows;
1149  size_t numRowsInserted = 0;
1150  vector<DataBlockPtr> dataCopy =
1151  insert_data.data; // bc append data will move ptr forward and this violates
1152  // constness of InsertData
1153  if (numRowsLeft <= 0) {
1154  return;
1155  }
1156 
1157  FragmentInfo* currentFragment{nullptr};
1158 
1159  // Access to fragmentInfoVec_ is protected as we are under the insertMutex_ lock but it
1160  // feels fragile
1161  if (fragmentInfoVec_.empty()) { // if no fragments exist for table
1162  currentFragment = createNewFragment(defaultInsertLevel_);
1163  } else {
1164  currentFragment = fragmentInfoVec_.back().get();
1165  }
1166  CHECK(currentFragment);
1167 
1168  size_t startFragment = fragmentInfoVec_.size() - 1;
1169 
1170  while (numRowsLeft > 0) { // may have to create multiple fragments for bulk insert
1171  // loop until done inserting all rows
1172  CHECK_LE(currentFragment->shadowNumTuples, maxFragmentRows_);
1173  size_t rowsLeftInCurrentFragment =
1174  maxFragmentRows_ - currentFragment->shadowNumTuples;
1175  size_t numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
1176  if (rowsLeftInCurrentFragment != 0) {
1177  for (auto& varLenColInfoIt : varLenColInfo_) {
1178  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
1179  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
1180  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
1181  if (insertIdIt != inverseInsertDataColIdMap.end()) {
1182  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
1183  numRowsToInsert = std::min(numRowsToInsert,
1184  colMapIt->second.getNumElemsForBytesInsertData(
1185  dataCopy[insertIdIt->second],
1186  numRowsToInsert,
1187  numRowsInserted,
1188  bytesLeft,
1189  insert_data.is_default[insertIdIt->second]));
1190  }
1191  }
1192  }
1193 
1194  if (rowsLeftInCurrentFragment == 0 || numRowsToInsert == 0) {
1195  currentFragment = createNewFragment(defaultInsertLevel_);
1196  if (numRowsInserted == 0) {
1197  startFragment++;
1198  }
1199  rowsLeftInCurrentFragment = maxFragmentRows_;
1200  for (auto& varLenColInfoIt : varLenColInfo_) {
1201  varLenColInfoIt.second = 0; // reset byte counter
1202  }
1203  numRowsToInsert = min(rowsLeftInCurrentFragment, numRowsLeft);
1204  for (auto& varLenColInfoIt : varLenColInfo_) {
1205  CHECK_LE(varLenColInfoIt.second, maxChunkSize_);
1206  size_t bytesLeft = maxChunkSize_ - varLenColInfoIt.second;
1207  auto insertIdIt = inverseInsertDataColIdMap.find(varLenColInfoIt.first);
1208  if (insertIdIt != inverseInsertDataColIdMap.end()) {
1209  auto colMapIt = columnMap_.find(varLenColInfoIt.first);
1210  numRowsToInsert = std::min(numRowsToInsert,
1211  colMapIt->second.getNumElemsForBytesInsertData(
1212  dataCopy[insertIdIt->second],
1213  numRowsToInsert,
1214  numRowsInserted,
1215  bytesLeft,
1216  insert_data.is_default[insertIdIt->second]));
1217  }
1218  }
1219  }
1220 
1221  CHECK_GT(numRowsToInsert, size_t(0)); // would put us into an endless loop as we'd
1222  // never be able to insert anything
1223 
1224  {
1225  heavyai::unique_lock<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
1226  // for each column, append the data in the appropriate insert buffer
1227  for (size_t i = 0; i < insert_data.columnIds.size(); ++i) {
1228  int columnId = insert_data.columnIds[i];
1229  auto colMapIt = columnMap_.find(columnId);
1230  CHECK(colMapIt != columnMap_.end());
1231  currentFragment->shadowChunkMetadataMap[columnId] = colMapIt->second.appendData(
1232  dataCopy[i], numRowsToInsert, numRowsInserted, insert_data.is_default[i]);
1233  auto varLenColInfoIt = varLenColInfo_.find(columnId);
1234  if (varLenColInfoIt != varLenColInfo_.end()) {
1235  varLenColInfoIt->second = colMapIt->second.getBuffer()->size();
1236  }
1237  }
1238  if (hasMaterializedRowId_) {
1239  size_t startId = maxFragmentRows_ * currentFragment->fragmentId +
1240  currentFragment->shadowNumTuples;
1241  auto row_id_data = std::make_unique<int64_t[]>(numRowsToInsert);
1242  for (size_t i = 0; i < numRowsToInsert; ++i) {
1243  row_id_data[i] = i + startId;
1244  }
1245  DataBlockPtr rowIdBlock;
1246  rowIdBlock.numbersPtr = reinterpret_cast<int8_t*>(row_id_data.get());
1247  auto colMapIt = columnMap_.find(rowIdColId_);
1248  currentFragment->shadowChunkMetadataMap[rowIdColId_] =
1249  colMapIt->second.appendData(rowIdBlock, numRowsToInsert, numRowsInserted);
1250  }
1251 
1252  currentFragment->shadowNumTuples =
1253  fragmentInfoVec_.back()->getPhysicalNumTuples() + numRowsToInsert;
1254  numRowsLeft -= numRowsToInsert;
1255  numRowsInserted += numRowsToInsert;
1256  for (auto partIt = fragmentInfoVec_.begin() + startFragment;
1257  partIt != fragmentInfoVec_.end();
1258  ++partIt) {
1259  auto fragment_ptr = partIt->get();
1260  fragment_ptr->setPhysicalNumTuples(fragment_ptr->shadowNumTuples);
1261  fragment_ptr->setChunkMetadataMap(fragment_ptr->shadowChunkMetadataMap);
1262  }
1263  }
1264  }
1265  numTuples_ += insert_data.numRows;
1266  dropFragmentsToSizeNoInsertLock(maxRows_);
1267 }
1268 
1269 FragmentInfo* InsertOrderFragmenter::createNewFragment(
1270  const Data_Namespace::MemoryLevel memoryLevel) {
1271  // also sets the new fragment as the insertBuffer for each column
1272 
1273  maxFragmentId_++;
1274  auto newFragmentInfo = std::make_unique<FragmentInfo>();
1275  newFragmentInfo->fragmentId = maxFragmentId_;
1276  newFragmentInfo->shadowNumTuples = 0;
1277  newFragmentInfo->setPhysicalNumTuples(0);
1278  for (const auto levelSize : dataMgr_->levelSizes_) {
1279  newFragmentInfo->deviceIds.push_back(compute_device_for_fragment(
1280  physicalTableId_, newFragmentInfo->fragmentId, levelSize));
1281  }
1282  newFragmentInfo->physicalTableId = physicalTableId_;
1283  newFragmentInfo->shard = shard_;
1284 
1285  for (map<int, Chunk>::iterator colMapIt = columnMap_.begin();
1286  colMapIt != columnMap_.end();
1287  ++colMapIt) {
1288  auto& chunk = colMapIt->second;
1289  if (memoryLevel == Data_Namespace::MemoryLevel::CPU_LEVEL) {
1290  /* At the end of this function chunks from the previous fragment become 'rolled
1291  * off', temporaray tables will lose reference to any 'rolled off' chunks and are
1292  * not able to unpin these chunks. Keep reference to 'rolled off' chunks and unpin
1293  * at ~InsertOrderFragmenter, chunks wrapped by unique_ptr to avoid extraneous
1294  * ~Chunk calls with temporary chunks.*/
1295  tracked_in_memory_chunks_.emplace_back(std::make_unique<Chunk_NS::Chunk>(chunk));
1296  }
1297  ChunkKey chunkKey = chunkKeyPrefix_;
1298  chunkKey.push_back(chunk.getColumnDesc()->columnId);
1299  chunkKey.push_back(maxFragmentId_);
1300  chunk.createChunkBuffer(dataMgr_,
1301  chunkKey,
1302  memoryLevel,
1303  newFragmentInfo->deviceIds[static_cast<int>(memoryLevel)],
1304  pageSize_);
1305  chunk.initEncoder();
1306  }
1307 
1308  heavyai::lock_guard<heavyai::shared_mutex> writeLock(fragmentInfoMutex_);
1309  fragmentInfoVec_.push_back(std::move(newFragmentInfo));
1310  return fragmentInfoVec_.back().get();
1311 }
1312 
1313 size_t InsertOrderFragmenter::getNumFragments() {
1314  heavyai::shared_lock<heavyai::shared_mutex> readLock(fragmentInfoMutex_);
1315  return fragmentInfoVec_.size();
1316 }
1317 
1318 TableInfo InsertOrderFragmenter::getFragmentsForQuery() {
1319  heavyai::shared_lock<heavyai::shared_mutex> readLock(fragmentInfoMutex_);
1320  TableInfo queryInfo;
1321  queryInfo.chunkKeyPrefix = chunkKeyPrefix_;
1322  // right now we don't test predicate, so just return (copy of) all fragments
1323  bool fragmentsExist = false;
1324  if (fragmentInfoVec_.empty()) {
1325  // If we have no fragments add a dummy empty fragment to make the executor
1326  // not have separate logic for 0-row tables
1327  int maxFragmentId = 0;
1328  FragmentInfo emptyFragmentInfo;
1329  emptyFragmentInfo.fragmentId = maxFragmentId;
1330  emptyFragmentInfo.shadowNumTuples = 0;
1331  emptyFragmentInfo.setPhysicalNumTuples(0);
1332  emptyFragmentInfo.deviceIds.resize(dataMgr_->levelSizes_.size());
1333  emptyFragmentInfo.physicalTableId = physicalTableId_;
1334  emptyFragmentInfo.shard = shard_;
1335  queryInfo.fragments.push_back(emptyFragmentInfo);
1336  } else {
1337  fragmentsExist = true;
1338  std::for_each(
1339  fragmentInfoVec_.begin(),
1340  fragmentInfoVec_.end(),
1341  [&queryInfo](const auto& fragment_owned_ptr) {
1342  queryInfo.fragments.emplace_back(*fragment_owned_ptr); // makes a copy
1343  });
1344  }
1345  readLock.unlock();
1346  queryInfo.setPhysicalNumTuples(0);
1347  auto partIt = queryInfo.fragments.begin();
1348  if (fragmentsExist) {
1349  while (partIt != queryInfo.fragments.end()) {
1350  if (partIt->getPhysicalNumTuples() == 0) {
1351  // this means that a concurrent insert query inserted tuples into a new fragment
1352  // but when the query came in we didn't have this fragment. To make sure we
1353  // don't mess up the executor we delete this fragment from the metadatamap
1354  // (fixes earlier bug found 2015-05-08)
1355  partIt = queryInfo.fragments.erase(partIt);
1356  } else {
1357  queryInfo.setPhysicalNumTuples(queryInfo.getPhysicalNumTuples() +
1358  partIt->getPhysicalNumTuples());
1359  ++partIt;
1360  }
1361  }
1362  } else {
1363  // We added a dummy fragment and know the table is empty
1364  queryInfo.setPhysicalNumTuples(0);
1365  }
1366  return queryInfo;
1367 }
1368 
1369 void InsertOrderFragmenter::resetSizesFromFragments() {
1370  heavyai::shared_lock<heavyai::shared_mutex> read_lock(fragmentInfoMutex_);
1371  numTuples_ = 0;
1372  for (const auto& fragment_info : fragmentInfoVec_) {
1373  numTuples_ += fragment_info->getPhysicalNumTuples();
1374  }
1375  setLastFragmentVarLenColumnSizes();
1376 }
1377 
1378 void InsertOrderFragmenter::alterColumnGeoType(
1379  const std::list<
1380  std::pair<const ColumnDescriptor*, std::list<const ColumnDescriptor*>>>&
1381  src_dst_column_pairs) {
1382  CHECK(defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL &&
1383  !uses_foreign_storage_)
1384  << "`alterColumnTypeTransactional` only supported for regular tables";
1385  heavyai::unique_lock<heavyai::shared_mutex> write_lock(fragmentInfoMutex_);
1386 
1387  for (const auto& [src_cd, dst_columns] : src_dst_column_pairs) {
1388  auto logical_geo_column = *dst_columns.begin();
1389  CHECK(logical_geo_column->columnType.is_geometry());
1390 
1391  columnMap_.erase(
1392  src_cd->columnId); // NOTE: Necessary to prevent unpinning issues with these
1393  // chunks when fragmenter is destroyed later.
1394 
1395  for (const auto& fragment_info : fragmentInfoVec_) {
1396  int device_id = fragment_info->deviceIds[static_cast<int>(defaultInsertLevel_)];
1397  auto num_elements = fragment_info->chunkMetadataMap[src_cd->columnId]->numElements;
1398 
1399  CHECK_GE(dst_columns.size(), 1UL);
1400 
1401  std::list<const ColumnDescriptor*> columns = dst_columns;
1402  GeoAlterColumnContext alter_column_context{device_id,
1403  chunkKeyPrefix_,
1404  fragment_info.get(),
1405  src_cd,
1406  *dst_columns.begin(),
1407  num_elements,
1408  dataMgr_,
1409  catalog_,
1410  columnMap_,
1411  columns};
1412 
1413  alter_column_context.readSourceData();
1414 
1415  alter_column_context.createScratchBuffers();
1416 
1417  ScopeGuard delete_temp_chunk = [&] { alter_column_context.deleteScratchBuffers(); };
1418 
1419  const bool geo_validate_geometry = false;
1420  alter_column_context.encodeData(geo_validate_geometry);
1421 
1422  alter_column_context.putBuffersToDisk();
1423  }
1424  }
1425 }
1426 
1427 void InsertOrderFragmenter::alterNonGeoColumnType(
1428  const std::list<const ColumnDescriptor*>& columns) {
1429  CHECK(defaultInsertLevel_ == Data_Namespace::MemoryLevel::DISK_LEVEL &&
1430  !uses_foreign_storage_)
1431  << "`alterColumnTypeTransactional` only supported for regular tables";
1432 
1433  heavyai::unique_lock<heavyai::shared_mutex> write_lock(fragmentInfoMutex_);
1434 
1435  for (const auto dst_cd : columns) {
1436  auto col_it = columnMap_.find(dst_cd->columnId);
1437  CHECK(col_it != columnMap_.end());
1438 
1439  auto src_cd = col_it->second.getColumnDesc();
1440  CHECK_EQ(col_it->first, src_cd->columnId);
1441 
1443  .sql_types_match) {
1444  continue;
1445  }
1446 
1447  for (const auto& fragment_info : fragmentInfoVec_) {
1448  int device_id = fragment_info->deviceIds[static_cast<int>(defaultInsertLevel_)];
1449  auto num_elements = fragment_info->chunkMetadataMap[src_cd->columnId]->numElements;
1450 
1451  NonGeoAlterColumnContext alter_column_context{device_id,
1452  chunkKeyPrefix_,
1453  fragment_info.get(),
1454  src_cd,
1455  dst_cd,
1456  num_elements,
1457  dataMgr_,
1458  catalog_,
1459  columnMap_};
1460 
1461  alter_column_context.readSourceData();
1462 
1463  alter_column_context.createScratchBuffers();
1464 
1465  ScopeGuard delete_temp_chunk = [&] { alter_column_context.deleteScratchBuffers(); };
1466 
1467  alter_column_context.reencodeData();
1468 
1469  alter_column_context.putBuffersToDisk();
1470  }
1471  }
1472 }
1473 
1474 void InsertOrderFragmenter::setLastFragmentVarLenColumnSizes() {
1475  if (!uses_foreign_storage_ && fragmentInfoVec_.size() > 0) {
1476  // Now need to get the insert buffers for each column - should be last
1477  // fragment
1478  int lastFragmentId = fragmentInfoVec_.back()->fragmentId;
1479  // TODO: add accessor here for safe indexing
1480  int deviceId =
1481  fragmentInfoVec_.back()->deviceIds[static_cast<int>(defaultInsertLevel_)];
1482  for (auto colIt = columnMap_.begin(); colIt != columnMap_.end(); ++colIt) {
1483  ChunkKey insertKey = chunkKeyPrefix_; // database_id and table_id
1484  insertKey.push_back(colIt->first); // column id
1485  insertKey.push_back(lastFragmentId); // fragment id
1486  colIt->second.getChunkBuffer(dataMgr_, insertKey, defaultInsertLevel_, deviceId);
1487  auto varLenColInfoIt = varLenColInfo_.find(colIt->first);
1488  if (varLenColInfoIt != varLenColInfo_.end()) {
1489  varLenColInfoIt->second = colIt->second.getBuffer()->size();
1490  }
1491  }
1492  }
1493 }
1494 } // namespace Fragmenter_Namespace
std::lock_guard< T > lock_guard
int32_t maxRollbackEpochs
bool g_use_table_device_offset
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:460
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
CompareResult compare_column_descriptors(const ColumnDescriptor *lhs, const ColumnDescriptor *rhs)
Definition: DdlUtils.cpp:52
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
#define LOG(tag)
Definition: Logger.h:285
void setIndexBuffer(AbstractBuffer *ib)
Definition: Chunk.h:152
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:306
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1470
std::vector< bool > is_default
Definition: Fragmenter.h:75
std::unique_ptr< BaseConvertEncoder > create_string_view_encoder(ConversionFactoryParam &param, const bool error_tracking_enabled, const bool geo_validate_geometry)
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:170
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string to_string(char const *&&v)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:150
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::shared_lock< T > shared_lock
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:164
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:65
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
std::unique_lock< T > unique_lock
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
NonGeoAlterColumnContext(int device_id, const ChunkKey &chunk_key_prefix, Fragmenter_Namespace::FragmentInfo *fragment_info, const ColumnDescriptor *src_cd, const ColumnDescriptor *dst_cd, const size_t num_elements, Data_Namespace::DataMgr *data_mgr, Catalog_Namespace::Catalog *catalog_, std::map< int, Chunk_NS::Chunk > &column_map)
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
ChunkKey get_chunk_key(const ChunkKey &prefix, int column_id, int fragment_id)
std::map< int, std::shared_ptr< Chunk_NS::Chunk > > chunks
Definition: Fragmenter.h:52
data_mgr_(data_mgr)
std::vector< size_t > valid_row_indices
Definition: Fragmenter.h:53
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:166
#define DROP_FRAGMENT_FACTOR
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:118
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
Definition: FileMgr.h:80
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
void create_array_elem_type_chunk(ArrayElemTypeChunk &array_chunk, const ColumnDescriptor *array_cd)
bool is_dict_encoded_string() const
Definition: sqltypes.h:641
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:635
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
std::unique_ptr< BaseSource > create_source(const Chunk_NS::Chunk &input, const int db_id)
int8_t * numbersPtr
Definition: sqltypes.h:233
BaseAlterColumnContext(int device_id, const ChunkKey &chunk_key_prefix, Fragmenter_Namespace::FragmentInfo *fragment_info, const ColumnDescriptor *src_cd, const ColumnDescriptor *dst_cd, const size_t num_elements, Data_Namespace::DataMgr *data_mgr, Catalog_Namespace::Catalog *catalog, std::map< int, Chunk_NS::Chunk > &column_map)
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:975
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
ChunkMetadataMap shadowChunkMetadataMap
Definition: Fragmenter.h:133
#define VLOG(n)
Definition: Logger.h:388
size_t get_num_rows_to_insert(const size_t rows_left_in_current_fragment, const size_t num_rows_left, const size_t num_rows_inserted, const std::unordered_map< int, size_t > &var_len_col_info, const size_t max_chunk_size, const InsertChunks &insert_chunks, std::map< int, Chunk_NS::Chunk > &column_map, const std::vector< size_t > &valid_row_indices)
GeoAlterColumnContext(int device_id, const ChunkKey &chunk_key_prefix, Fragmenter_Namespace::FragmentInfo *fragment_info, const ColumnDescriptor *src_cd, const ColumnDescriptor *dst_cd, const size_t num_elements, Data_Namespace::DataMgr *data_mgr, Catalog_Namespace::Catalog *catalog, std::map< int, Chunk_NS::Chunk > &column_map, const std::list< const ColumnDescriptor * > &columns)
int compute_device_for_fragment(const int table_id, const int fragment_id, const int num_devices)