OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <mutex>
19 #include <string>
20 #include <vector>
21 
22 #include <boost/variant.hpp>
23 #include <boost/variant/get.hpp>
24 
25 #include "Catalog/Catalog.h"
29 #include "LockMgr/LockMgr.h"
30 #include "QueryEngine/Execute.h"
31 #include "Shared/DateConverters.h"
33 #include "Shared/thread_count.h"
35 
36 extern bool g_enable_string_functions;
37 
39 
40 namespace Fragmenter_Namespace {
41 
42 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
43  for (auto& t : threads) {
44  t.get();
45  }
46  threads.clear();
47 }
48 
49 inline bool is_integral(const SQLTypeInfo& t) {
50  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
51 }
52 
54 
56  const TableDescriptor* td,
57  const ColumnDescriptor* cd,
58  const int fragment_id,
59  const std::vector<uint64_t>& frag_offsets,
60  const ScalarTargetValue& rhs_value,
61  const SQLTypeInfo& rhs_type,
62  const Data_Namespace::MemoryLevel memory_level,
63  UpdelRoll& updel_roll) {
64  updateColumn(catalog,
65  td,
66  cd,
67  fragment_id,
68  frag_offsets,
69  std::vector<ScalarTargetValue>(1, rhs_value),
70  rhs_type,
71  memory_level,
72  updel_roll);
73 }
74 
75 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
76  const TableDescriptor* td,
77  const FragmentInfo& fragment,
78  const Data_Namespace::MemoryLevel memory_level,
79  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
80  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
81  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
82  ++nc;
83  if (!cd->isVirtualCol) {
84  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
85  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
86  ChunkKey chunk_key{
87  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
88  auto chunk = Chunk_NS::Chunk::getChunk(cd,
89  &catalog->getDataMgr(),
90  chunk_key,
91  memory_level,
92  0,
93  chunk_meta_it->second->numBytes,
94  chunk_meta_it->second->numElements);
95  chunks.push_back(chunk);
96  }
97  }
98  }
99  return chunks.size();
100 }
101 
103  public:
105 
106  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
107 
108  virtual void addDataBlocksToInsertData(
109  Fragmenter_Namespace::InsertData& insertData) = 0;
110 };
111 
112 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
114  using ColumnDataPtr =
115  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
116 
120  const BUFFER_DATA_TYPE* data_buffer_addr_;
121 
122  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
123  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
124  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
125  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
126  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
127  }
128 
129  ~ScalarChunkConverter() override {}
130 
131  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
132  auto buffer_value = data_buffer_addr_[indexInFragment];
133  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
134  column_data_.get()[row] = insert_value;
135  }
136 
138  DataBlockPtr dataBlock;
139  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
140  insertData.data.push_back(dataBlock);
141  insertData.columnIds.push_back(column_descriptor_->columnId);
142  }
143 };
144 
148 
149  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
152 
153  FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
154  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
155  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
158  }
159 
161 
162  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
163  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
164 
166  src_value_ptr);
167 
168  (*column_data_)[row] = ArrayDatum(
169  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
170  }
171 
173  DataBlockPtr dataBlock;
174  dataBlock.arraysPtr = column_data_.get();
175  insertData.data.push_back(dataBlock);
176  insertData.columnIds.push_back(column_descriptor_->columnId);
177  }
178 };
179 
182 
183  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
184  : FixedLenArrayChunkConverter(num_rows, chunk) {
186  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
187  : nullptr);
188  }
189 
190  ~ArrayChunkConverter() override {}
191 
192  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
193  auto startIndex = index_buffer_addr_[indexInFragment];
194  auto endIndex = index_buffer_addr_[indexInFragment + 1];
195  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
196  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
197  (*column_data_)[row] = ArrayDatum(
198  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
199  }
200 };
201 
205 
206  std::unique_ptr<std::vector<std::string>> column_data_;
207  const int8_t* data_buffer_addr_;
209 
210  StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk* chunk)
211  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
212  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
215  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
216  : nullptr);
217  }
218 
219  ~StringChunkConverter() override {}
220 
221  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
222  size_t src_value_size =
223  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
224  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
225  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
226  }
227 
229  DataBlockPtr dataBlock;
230  dataBlock.stringsPtr = column_data_.get();
231  insertData.data.push_back(dataBlock);
232  insertData.columnIds.push_back(column_descriptor_->columnId);
233  }
234 };
235 
236 template <typename BUFFER_DATA_TYPE>
238  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
239 
243  const BUFFER_DATA_TYPE* data_buffer_addr_;
244 
245  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
246  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
248  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
249  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
250  }
251 
252  ~DateChunkConverter() override {}
253 
254  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
255  auto buffer_value = data_buffer_addr_[indexInFragment];
256  auto insert_value = static_cast<int64_t>(buffer_value);
258  }
259 
261  DataBlockPtr dataBlock;
262  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
263  insertData.data.push_back(dataBlock);
264  insertData.columnIds.push_back(column_descriptor_->columnId);
265  }
266 };
267 
269  const Catalog_Namespace::Catalog* catalog,
270  const TableDescriptor* td,
271  const int fragmentId,
272  const std::vector<TargetMetaInfo> sourceMetaInfo,
273  const std::vector<const ColumnDescriptor*> columnDescriptors,
274  const RowDataProvider& sourceDataProvider,
275  const size_t indexOffFragmentOffsetColumn,
276  const Data_Namespace::MemoryLevel memoryLevel,
277  UpdelRoll& updelRoll,
278  Executor* executor) {
279  updelRoll.is_varlen_update = true;
280  updelRoll.catalog = catalog;
281  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
282  updelRoll.memoryLevel = memoryLevel;
283 
284  size_t num_entries = sourceDataProvider.getEntryCount();
285  size_t num_rows = sourceDataProvider.getRowCount();
286 
287  if (0 == num_rows) {
288  // bail out early
289  return;
290  }
291 
293 
294  auto fragment_ptr = getFragmentInfo(fragmentId);
295  auto& fragment = *fragment_ptr;
296  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
297  get_chunks(catalog, td, fragment, memoryLevel, chunks);
298  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
299  columnDescriptors.size());
300  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
301  size_t indexOfDeletedColumn{0};
302  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
303  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
304  auto chunk = chunks[indexOfChunk];
305  const auto chunk_cd = chunk->getColumnDesc();
306 
307  if (chunk_cd->isDeletedCol) {
308  indexOfDeletedColumn = chunk_cd->columnId;
309  deletedChunk = chunk;
310  continue;
311  }
312 
313  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
314  columnDescriptors.end(),
315  [=](const ColumnDescriptor* cd) -> bool {
316  return cd->columnId == chunk_cd->columnId;
317  });
318 
319  if (targetColumnIt != columnDescriptors.end()) {
320  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
321 
322  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
323  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
324 
326  num_rows,
327  *catalog,
328  sourceDataMetaInfo,
329  targetDescriptor,
330  targetDescriptor->columnType,
331  !targetDescriptor->columnType.get_notnull(),
332  sourceDataProvider.getLiteralDictionary(),
334  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
335  ? executor->getStringDictionaryProxy(
336  sourceDataMetaInfo.get_type_info().get_comp_param(),
337  executor->getRowSetMemoryOwner(),
338  true)
339  : nullptr,
340  nullptr};
341  auto converter = factory.create(param);
342  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
343 
344  if (targetDescriptor->columnType.is_geometry()) {
345  // geometry columns are composites
346  // need to skip chunks, depending on geo type
347  switch (targetDescriptor->columnType.get_type()) {
348  case kMULTIPOLYGON:
349  indexOfChunk += 5;
350  break;
351  case kPOLYGON:
352  indexOfChunk += 4;
353  break;
354  case kLINESTRING:
355  indexOfChunk += 2;
356  break;
357  case kPOINT:
358  indexOfChunk += 1;
359  break;
360  default:
361  CHECK(false); // not supported
362  }
363  }
364  } else {
365  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
366  std::unique_ptr<ChunkToInsertDataConverter> converter;
367 
368  if (chunk_cd->columnType.is_fixlen_array()) {
369  converter =
370  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
371  } else if (chunk_cd->columnType.is_string()) {
372  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
373  } else if (chunk_cd->columnType.is_geometry()) {
374  // the logical geo column is a string column
375  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
376  } else {
377  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
378  }
379 
380  chunkConverters.push_back(std::move(converter));
381 
382  } else if (chunk_cd->columnType.is_date_in_days()) {
383  /* Q: Why do we need this?
384  A: In variable length updates path we move the chunk content of column
385  without decoding. Since it again passes through DateDaysEncoder
386  the expected value should be in seconds, but here it will be in days.
387  Therefore, using DateChunkConverter chunk values are being scaled to
388  seconds which then ultimately encoded in days in DateDaysEncoder.
389  */
390  std::unique_ptr<ChunkToInsertDataConverter> converter;
391  const size_t physical_size = chunk_cd->columnType.get_size();
392  if (physical_size == 2) {
393  converter =
394  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
395  } else if (physical_size == 4) {
396  converter =
397  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
398  } else {
399  CHECK(false);
400  }
401  chunkConverters.push_back(std::move(converter));
402  } else {
403  std::unique_ptr<ChunkToInsertDataConverter> converter;
404  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
405  int logical_size = logical_type.get_size();
406  int physical_size = chunk_cd->columnType.get_size();
407 
408  if (logical_type.is_string()) {
409  // for dicts -> logical = physical
410  logical_size = physical_size;
411  }
412 
413  if (8 == physical_size) {
414  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
415  num_rows, chunk.get());
416  } else if (4 == physical_size) {
417  if (8 == logical_size) {
418  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
419  num_rows, chunk.get());
420  } else {
421  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
422  num_rows, chunk.get());
423  }
424  } else if (2 == chunk_cd->columnType.get_size()) {
425  if (8 == logical_size) {
426  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
427  num_rows, chunk.get());
428  } else if (4 == logical_size) {
429  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
430  num_rows, chunk.get());
431  } else {
432  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
433  num_rows, chunk.get());
434  }
435  } else if (1 == chunk_cd->columnType.get_size()) {
436  if (8 == logical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
438  num_rows, chunk.get());
439  } else if (4 == logical_size) {
440  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
441  num_rows, chunk.get());
442  } else if (2 == logical_size) {
443  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
444  num_rows, chunk.get());
445  } else {
446  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
447  num_rows, chunk.get());
448  }
449  } else {
450  CHECK(false); // unknown
451  }
452 
453  chunkConverters.push_back(std::move(converter));
454  }
455  }
456  }
457 
458  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
459  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
460 
461  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
462  bool* deletedChunkBuffer =
463  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
464 
465  std::atomic<size_t> row_idx{0};
466 
467  auto row_converter = [&sourceDataProvider,
468  &sourceDataConverters,
469  &indexOffFragmentOffsetColumn,
470  &chunkConverters,
471  &deletedChunkBuffer,
472  &row_idx](size_t indexOfEntry) -> void {
473  // convert the source data
474  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
475  if (row.empty()) {
476  return;
477  }
478 
479  size_t indexOfRow = row_idx.fetch_add(1);
480 
481  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
482  if (sourceDataConverters[col]) {
483  const auto& mapd_variant = row[col];
484  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
485  }
486  }
487 
488  auto scalar = checked_get(
489  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
490  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
491 
492  // convert the remaining chunks
493  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
494  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
495  }
496 
497  // now mark the row as deleted
498  deletedChunkBuffer[indexInChunkBuffer] = true;
499  };
500 
501  bool can_go_parallel = num_rows > 20000;
502 
503  if (can_go_parallel) {
504  const size_t num_worker_threads = cpu_threads();
505  std::vector<std::future<void>> worker_threads;
506  for (size_t i = 0,
507  start_entry = 0,
508  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
509  i < num_worker_threads && start_entry < num_entries;
510  ++i, start_entry += stride) {
511  const auto end_entry = std::min(start_entry + stride, num_rows);
512  worker_threads.push_back(std::async(
514  [&row_converter](const size_t start, const size_t end) {
515  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
516  row_converter(indexOfRow);
517  }
518  },
519  start_entry,
520  end_entry));
521  }
522 
523  for (auto& child : worker_threads) {
524  child.wait();
525  }
526 
527  } else {
528  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
529  row_converter(entryIdx);
530  }
531  }
532 
534  insert_data.databaseId = catalog->getCurrentDB().dbId;
535  insert_data.tableId = td->tableId;
536 
537  for (size_t i = 0; i < chunkConverters.size(); i++) {
538  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
539  continue;
540  }
541 
542  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
543  if (sourceDataConverters[i]) {
544  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
545  }
546  continue;
547  }
548 
549  insert_data.numRows = num_rows;
550  insert_data.is_default.resize(insert_data.columnIds.size(), false);
551  insertDataNoCheckpoint(insert_data);
552 
553  // update metdata for deleted chunk as we are doing special handling
554  auto chunkMetadata =
555  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
556  chunkMetadata->chunkStats.max.boolval = 1;
557 
558  // Im not completely sure that we need to do this in fragmented and on the buffer
559  // but leaving this alone for now
560  if (!deletedChunk->getBuffer()->hasEncoder()) {
561  deletedChunk->initEncoder();
562  }
563  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
564 
565  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
566  // An append to the same fragment will increase shadowNumTuples.
567  // Update NumElems in this case. Otherwise, use existing NumElems.
568  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
569  }
570  deletedChunk->getBuffer()->setUpdated();
571 }
572 
573 namespace {
574 inline void update_metadata(SQLTypeInfo const& ti,
576  int64_t const updated_val,
577  int64_t const old_val,
578  NullSentinelSupplier s = NullSentinelSupplier()) {
579  if (ti.get_notnull()) {
581  update_stats.new_values_stats.max_int64t,
582  updated_val);
584  update_stats.old_values_stats.max_int64t,
585  old_val);
586  } else {
588  update_stats.new_values_stats.max_int64t,
589  update_stats.new_values_stats.has_null,
590  updated_val,
591  s(ti, updated_val));
593  update_stats.old_values_stats.max_int64t,
594  update_stats.old_values_stats.has_null,
595  old_val,
596  s(ti, old_val));
597  }
598 }
599 
600 inline void update_metadata(SQLTypeInfo const& ti,
601  ChunkUpdateStats& update_stats,
602  double const updated_val,
603  double const old_val,
604  NullSentinelSupplier s = NullSentinelSupplier()) {
605  if (ti.get_notnull()) {
607  update_stats.new_values_stats.max_double,
608  updated_val);
610  update_stats.old_values_stats.max_double,
611  old_val);
612  } else {
614  update_stats.new_values_stats.max_double,
615  update_stats.new_values_stats.has_null,
616  updated_val,
617  s(ti, updated_val));
619  update_stats.old_values_stats.max_double,
620  update_stats.old_values_stats.has_null,
621  old_val,
622  s(ti, old_val));
623  }
624 }
625 
626 inline void update_metadata(UpdateValuesStats& agg_stats,
627  const UpdateValuesStats& new_stats) {
628  agg_stats.has_null = agg_stats.has_null || new_stats.has_null;
629  agg_stats.max_double = std::max<double>(agg_stats.max_double, new_stats.max_double);
630  agg_stats.min_double = std::min<double>(agg_stats.min_double, new_stats.min_double);
631  agg_stats.max_int64t = std::max<int64_t>(agg_stats.max_int64t, new_stats.max_int64t);
632  agg_stats.min_int64t = std::min<int64_t>(agg_stats.min_int64t, new_stats.min_int64t);
633 }
634 } // namespace
635 
636 std::optional<ChunkUpdateStats> InsertOrderFragmenter::updateColumn(
637  const Catalog_Namespace::Catalog* catalog,
638  const TableDescriptor* td,
639  const ColumnDescriptor* cd,
640  const int fragment_id,
641  const std::vector<uint64_t>& frag_offsets,
642  const std::vector<ScalarTargetValue>& rhs_values,
643  const SQLTypeInfo& rhs_type,
644  const Data_Namespace::MemoryLevel memory_level,
645  UpdelRoll& updel_roll) {
646  updel_roll.catalog = catalog;
647  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
648  updel_roll.memoryLevel = memory_level;
649 
650  const size_t ncore = cpu_threads();
651  const auto nrow = frag_offsets.size();
652  const auto n_rhs_values = rhs_values.size();
653  if (0 == nrow) {
654  return {};
655  }
656  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
657 
658  auto fragment_ptr = getFragmentInfo(fragment_id);
659  auto& fragment = *fragment_ptr;
660  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
661  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
662  ChunkKey chunk_key{
663  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
664  auto chunk = Chunk_NS::Chunk::getChunk(cd,
665  &catalog->getDataMgr(),
666  chunk_key,
668  0,
669  chunk_meta_it->second->numBytes,
670  chunk_meta_it->second->numElements);
671 
672  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
673 
674  // parallel update elements
675  std::vector<std::future<void>> threads;
676 
677  const auto segsz = (nrow + ncore - 1) / ncore;
678  auto dbuf = chunk->getBuffer();
679  auto dbuf_addr = dbuf->getMemoryPtr();
680  dbuf->setUpdated();
681  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
682  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
683  threads.emplace_back(std::async(
684  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
685  SQLTypeInfo lhs_type = cd->columnType;
686 
687  // !! not sure if this is a undocumented convention or a bug, but for a sharded
688  // table the dictionary id of a encoded string column is not specified by
689  // comp_param in physical table but somehow in logical table :) comp_param in
690  // physical table is always 0, so need to adapt accordingly...
691  auto cdl = (shard_ < 0)
692  ? cd
693  : catalog->getMetadataForColumn(
694  catalog->getLogicalTableId(td->tableId), cd->columnId);
695  CHECK(cdl);
696  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
697  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
698  lhs_type, &decimalOverflowValidator);
699  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
700  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
701  lhs_type, &dateDaysOverflowValidator);
702 
703  StringDictionary* stringDict{nullptr};
704  if (lhs_type.is_string()) {
705  CHECK(kENCODING_DICT == lhs_type.get_compression());
706  auto dictDesc = const_cast<DictDescriptor*>(
707  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
708  CHECK(dictDesc);
709  stringDict = dictDesc->stringDict.get();
710  CHECK(stringDict);
711  }
712 
713  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
714  const auto roffs = frag_offsets[r];
715  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
716  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
717  ScalarTargetValue sv2;
718 
719  // Subtle here is on the two cases of string-to-string assignments, when
720  // upstream passes RHS string as a string index instead of a preferred "real
721  // string".
722  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
723  // index
724  // in this layer, so if upstream passes a str idx here, an
725  // exception is thrown.
726  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
727  // str idx.
728  if (rhs_type.is_string()) {
729  if (const auto vp = boost::get<int64_t>(sv)) {
730  auto dictDesc = const_cast<DictDescriptor*>(
731  catalog->getMetadataForDict(rhs_type.get_comp_param()));
732  if (nullptr == dictDesc) {
733  throw std::runtime_error(
734  "UPDATE does not support cast from string literal to string "
735  "column.");
736  }
737  auto stringDict = dictDesc->stringDict.get();
738  CHECK(stringDict);
739  sv2 = NullableString(stringDict->getString(*vp));
740  sv = &sv2;
741  }
742  }
743 
744  if (const auto vp = boost::get<int64_t>(sv)) {
745  auto v = *vp;
746  if (lhs_type.is_string()) {
747  throw std::runtime_error("UPDATE does not support cast to string.");
748  }
749  int64_t old_val;
750  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
751  // Handle special case where date column with date in days encoding stores
752  // metadata in epoch seconds.
753  if (lhs_type.is_date_in_days()) {
755  }
756  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
757  if (lhs_type.is_decimal()) {
758  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
759  int64_t decimal_val;
760  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
761  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
762  lhs_type.get_notnull() == false)
763  ? v
764  : decimal_val;
766  lhs_type, update_stats_per_thread[c], target_value, old_val);
767  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
768  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
769  if (positive_v_and_negative_d || negative_v_and_positive_d) {
770  throw std::runtime_error(
771  "Data conversion overflow on " + std::to_string(v) +
772  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
773  std::to_string(rhs_type.get_scale()) + ") to (" +
774  std::to_string(lhs_type.get_dimension()) + ", " +
775  std::to_string(lhs_type.get_scale()) + ")");
776  }
777  } else if (is_integral(lhs_type)) {
778  if (lhs_type.is_date_in_days()) {
779  // Store meta values in seconds
780  if (lhs_type.get_size() == 2) {
781  nullAwareDateOverflowValidator.validate<int16_t>(v);
782  } else {
783  nullAwareDateOverflowValidator.validate<int32_t>(v);
784  }
785  int64_t days;
786  get_scalar<int64_t>(data_ptr, lhs_type, days);
787  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
788  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
789  lhs_type.get_notnull() == false)
790  ? NullSentinelSupplier()(lhs_type, v)
791  : seconds;
793  lhs_type, update_stats_per_thread[c], target_value, old_val);
794  } else {
795  int64_t target_value;
796  if (rhs_type.is_decimal()) {
797  target_value = round(decimal_to_double(rhs_type, v));
798  } else {
799  target_value = v;
800  }
802  lhs_type, update_stats_per_thread[c], target_value, old_val);
803  }
804  } else {
805  if (rhs_type.is_decimal()) {
806  update_metadata(lhs_type,
807  update_stats_per_thread[c],
808  decimal_to_double(rhs_type, v),
809  double(old_val));
810  } else {
811  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
812  }
813  }
814  } else if (const auto vp = boost::get<double>(sv)) {
815  auto v = *vp;
816  if (lhs_type.is_string()) {
817  throw std::runtime_error("UPDATE does not support cast to string.");
818  }
819  double old_val;
820  get_scalar<double>(data_ptr, lhs_type, old_val);
821  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
822  if (lhs_type.is_integer()) {
824  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
825  } else if (lhs_type.is_fp()) {
827  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
828  } else {
829  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
830  "LHS with a floating RHS.";
831  }
832  } else if (const auto vp = boost::get<float>(sv)) {
833  auto v = *vp;
834  if (lhs_type.is_string()) {
835  throw std::runtime_error("UPDATE does not support cast to string.");
836  }
837  float old_val;
838  get_scalar<float>(data_ptr, lhs_type, old_val);
839  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
840  if (lhs_type.is_integer()) {
842  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
843  } else {
844  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
845  }
846  } else if (const auto vp = boost::get<NullableString>(sv)) {
847  const auto s = boost::get<std::string>(vp);
848  const auto sval = s ? *s : std::string("");
849  if (lhs_type.is_string()) {
850  decltype(stringDict->getOrAdd(sval)) sidx;
851  {
852  std::unique_lock<std::mutex> lock(temp_mutex_);
853  sidx = stringDict->getOrAdd(sval);
854  }
855  int64_t old_val;
856  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
857  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
859  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
860  } else if (sval.size() > 0) {
861  auto dval = std::atof(sval.data());
862  if (lhs_type.is_boolean()) {
863  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
864  } else if (lhs_type.is_time()) {
865  throw std::runtime_error(
866  "Date/Time/Timestamp update not supported through translated "
867  "string path.");
868  }
869  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
870  double old_val;
871  get_scalar<double>(data_ptr, lhs_type, old_val);
872  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
874  lhs_type, update_stats_per_thread[c], double(dval), old_val);
875  } else {
876  int64_t old_val;
877  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
878  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
880  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
881  }
882  } else {
883  put_null(data_ptr, lhs_type, cd->columnName);
884  update_stats_per_thread[c].new_values_stats.has_null = true;
885  }
886  } else {
887  CHECK(false);
888  }
889  }
890  }));
891  if (threads.size() >= (size_t)cpu_threads()) {
892  wait_cleanup_threads(threads);
893  }
894  }
895  wait_cleanup_threads(threads);
896 
897  // for unit test
899  if (cd->isDeletedCol) {
900  const auto deleted_offsets = getVacuumOffsets(chunk);
901  if (deleted_offsets.size() > 0) {
902  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
903  return {};
904  }
905  }
906  }
908  for (size_t c = 0; c < ncore; ++c) {
909  update_metadata(update_stats.new_values_stats,
910  update_stats_per_thread[c].new_values_stats);
911  update_metadata(update_stats.old_values_stats,
912  update_stats_per_thread[c].old_values_stats);
913  }
914 
915  CHECK_GT(fragment.shadowNumTuples, size_t(0));
917  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
918  update_stats.updated_rows_count = nrow;
919  update_stats.fragment_rows_count = fragment.shadowNumTuples;
920  update_stats.chunk = chunk;
921  return update_stats;
922 }
923 
925  const ColumnDescriptor* cd,
926  FragmentInfo& fragment,
927  std::shared_ptr<Chunk_NS::Chunk> chunk,
928  const UpdateValuesStats& new_values_stats,
929  const SQLTypeInfo& rhs_type,
930  UpdelRoll& updel_roll) {
932  auto buffer = chunk->getBuffer();
933  const auto& lhs_type = cd->columnType;
934 
935  auto encoder = buffer->getEncoder();
936  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
937  static_assert(std::is_same<decltype(min), decltype(max)>::value,
938  "Type mismatch on min/max");
939  if (has_null) {
940  encoder->updateStats(decltype(min)(), true);
941  }
942  if (max < min) {
943  return;
944  }
945  encoder->updateStats(min, false);
946  encoder->updateStats(max, false);
947  };
948 
949  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
950  update_stats(new_values_stats.min_int64t,
951  new_values_stats.max_int64t,
952  new_values_stats.has_null);
953  } else if (lhs_type.is_fp()) {
954  update_stats(new_values_stats.min_double,
955  new_values_stats.max_double,
956  new_values_stats.has_null);
957  } else if (lhs_type.is_decimal()) {
958  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
959  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
960  new_values_stats.has_null);
961  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
962  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
963  update_stats(new_values_stats.min_int64t,
964  new_values_stats.max_int64t,
965  new_values_stats.has_null);
966  }
967  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
968  auto chunk_metadata =
969  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
970  buffer->getEncoder()->getMetadata(chunk_metadata);
971 }
972 
974  const MetaDataKey& key,
975  UpdelRoll& updel_roll) {
977  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
978  auto& fragmentInfo = *key.second;
979  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
980  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
981  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
982  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
983 }
984 
986  const TableDescriptor* td,
987  const FragmentInfo& fragment,
988  const Data_Namespace::MemoryLevel memory_level) {
989  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
990  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
991  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
992  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
993  ++ncol;
994  if (!cd->isVirtualCol) {
995  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
996  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
997  ChunkKey chunk_key{
998  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
999  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1000  &catalog_->getDataMgr(),
1001  chunk_key,
1002  memory_level,
1003  0,
1004  chunk_meta_it->second->numBytes,
1005  chunk_meta_it->second->numElements);
1006  chunks.push_back(chunk);
1007  }
1008  }
1009  }
1010  return chunks;
1011 }
1012 
1013 // get a sorted vector of offsets of rows to vacuum
1014 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1015  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1016  const auto data_buffer = chunk->getBuffer();
1017  const auto data_addr = data_buffer->getMemoryPtr();
1018  const size_t nrows_in_chunk = data_buffer->size();
1019  const size_t ncore = cpu_threads();
1020  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1021  std::vector<std::vector<uint64_t>> deleted_offsets;
1022  deleted_offsets.resize(ncore);
1023  std::vector<std::future<void>> threads;
1024  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1025  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1026  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1027  const auto ithread = rbegin / segsz;
1028  CHECK(ithread < deleted_offsets.size());
1029  deleted_offsets[ithread].reserve(segsz);
1030  for (size_t r = rbegin; r < rend; ++r) {
1031  if (data_addr[r]) {
1032  deleted_offsets[ithread].push_back(r);
1033  }
1034  }
1035  }));
1036  }
1037  wait_cleanup_threads(threads);
1038  std::vector<uint64_t> all_deleted_offsets;
1039  for (size_t i = 0; i < ncore; ++i) {
1040  all_deleted_offsets.insert(
1041  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1042  }
1043  return all_deleted_offsets;
1044 }
1045 
1046 template <typename T>
1047 static void set_chunk_stats(const SQLTypeInfo& col_type,
1048  int8_t* data_addr,
1049  bool& has_null,
1050  T& min,
1051  T& max) {
1052  T v;
1053  const auto can_be_null = !col_type.get_notnull();
1054  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1055  if (is_null) {
1056  has_null = has_null || (can_be_null && is_null);
1057  } else {
1058  set_minmax(min, max, v);
1059  }
1060 }
1061 
1063  FragmentInfo& fragment,
1064  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1065  const size_t nrows_to_keep,
1066  UpdelRoll& updel_roll) {
1067  auto cd = chunk->getColumnDesc();
1068  auto td = catalog->getMetadataForTable(cd->tableId);
1069  auto data_buffer = chunk->getBuffer();
1070  auto chunkMetadata =
1071  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
1072  chunkMetadata->numElements = nrows_to_keep;
1073  chunkMetadata->numBytes = data_buffer->size();
1074  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
1075 }
1076 
1078  const FragmentInfo& fragment,
1079  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1080  const std::vector<uint64_t>& frag_offsets) {
1081  const auto cd = chunk->getColumnDesc();
1082  const auto& col_type = cd->columnType;
1083  auto data_buffer = chunk->getBuffer();
1084  auto data_addr = data_buffer->getMemoryPtr();
1085  auto element_size =
1086  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1087  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1088  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1089  size_t nbytes_fix_data_to_keep = 0;
1090  auto nrows_to_vacuum = frag_offsets.size();
1091  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1092  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1093  auto is_last_one = irow == nrows_to_vacuum;
1094  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1095  auto maddr_to_vacuum = data_addr;
1096  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1097  if (nrows_to_keep > 0) {
1098  auto nbytes_to_keep = nrows_to_keep * element_size;
1099  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1100  // move curr fixlen row block toward front
1101  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1102  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1103  nbytes_to_keep);
1104  }
1105  irow_of_blk_to_fill += nrows_to_keep;
1106  nbytes_fix_data_to_keep += nbytes_to_keep;
1107  }
1108  irow_of_blk_to_keep = irow_to_vacuum + 1;
1109  }
1110  return nbytes_fix_data_to_keep;
1111 }
1112 
1113 // Gets the initial padding required for the chunk buffer. For variable length array
1114 // columns, if the first element after vacuuming is going to be a null array, a padding
1115 // with a value that is greater than 0 is expected.
1116 size_t get_null_padding(bool is_varlen_array,
1117  const std::vector<uint64_t>& frag_offsets,
1118  const StringOffsetT* index_array,
1119  size_t fragment_row_count) {
1120  if (is_varlen_array) {
1121  size_t first_non_deleted_row_index{0};
1122  for (auto deleted_offset : frag_offsets) {
1123  if (first_non_deleted_row_index < deleted_offset) {
1124  break;
1125  } else {
1126  first_non_deleted_row_index++;
1127  }
1128  }
1129  CHECK_LT(first_non_deleted_row_index, fragment_row_count);
1130  if (first_non_deleted_row_index == 0) {
1131  // If the first row in the fragment is not deleted, then the first offset in the
1132  // index buffer/array already contains expected padding.
1133  return index_array[0];
1134  } else {
1135  // If the first non-deleted element is a null array (indentified by a negative
1136  // offset), get a padding value for the chunk buffer.
1137  if (index_array[first_non_deleted_row_index + 1] < 0) {
1138  size_t first_non_zero_offset{0};
1139  for (size_t i = 0; i <= first_non_deleted_row_index; i++) {
1140  if (index_array[i] != 0) {
1141  first_non_zero_offset = index_array[i];
1142  break;
1143  }
1144  }
1145  CHECK_GT(first_non_zero_offset, static_cast<size_t>(0));
1147  first_non_zero_offset);
1148  } else {
1149  return 0;
1150  }
1151  }
1152  } else {
1153  return 0;
1154  }
1155 }
1156 
1157 // Gets the indexes of variable length null arrays in the chunk after vacuuming.
1158 std::set<size_t> get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info,
1159  const std::vector<uint64_t>& frag_offsets,
1160  const StringOffsetT* index_array,
1161  size_t fragment_row_count) {
1162  std::set<size_t> null_array_indexes;
1163  if (sql_type_info.is_varlen_array() && !sql_type_info.get_notnull()) {
1164  size_t frag_offset_index{0};
1165  size_t vacuum_offset{0};
1166  for (size_t i = 0; i < fragment_row_count; i++) {
1167  if (frag_offset_index < frag_offsets.size() &&
1168  i == frag_offsets[frag_offset_index]) {
1169  frag_offset_index++;
1170  vacuum_offset++;
1171  } else if (index_array[i + 1] < 0) {
1172  null_array_indexes.emplace(i - vacuum_offset);
1173  }
1174  }
1175  }
1176  return null_array_indexes;
1177 }
1178 
1179 StringOffsetT get_buffer_offset(bool is_varlen_array,
1180  const StringOffsetT* index_array,
1181  size_t index) {
1182  auto offset = index_array[index];
1183  if (offset < 0) {
1184  // Variable length arrays encode null arrays as negative offsets
1185  CHECK(is_varlen_array);
1186  offset = -offset;
1187  }
1188  return offset;
1189 }
1190 
1192  const FragmentInfo& fragment,
1193  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1194  const std::vector<uint64_t>& frag_offsets) {
1195  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1196  auto data_buffer = chunk->getBuffer();
1197  CHECK(data_buffer);
1198  auto index_buffer = chunk->getIndexBuf();
1199  CHECK(index_buffer);
1200  auto data_addr = data_buffer->getMemoryPtr();
1201  auto indices_addr = index_buffer->getMemoryPtr();
1202  CHECK(indices_addr);
1203  auto index_array = (StringOffsetT*)indices_addr;
1204  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1205  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1206  size_t nbytes_fix_data_to_keep = 0;
1207  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1208  size_t null_padding =
1209  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1210  size_t nbytes_var_data_to_keep = null_padding;
1211  auto null_array_indexes = get_var_len_null_array_indexes(
1212  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1213  auto nrows_to_vacuum = frag_offsets.size();
1214  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1215  auto is_last_one = irow == nrows_to_vacuum;
1216  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1217  auto maddr_to_vacuum = data_addr;
1218  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1219  if (nrows_to_keep > 0) {
1220  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1221  auto deleted_row_start_offset =
1222  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1223  auto kept_row_start_offset =
1224  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1225  auto nbytes_to_keep =
1226  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1227  kept_row_start_offset;
1228  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1229  if (nbytes_to_keep > 0) {
1230  CHECK(data_addr);
1231  // move curr varlen row block toward front
1232  memmove(data_addr + ibyte_var_data_to_keep,
1233  data_addr + kept_row_start_offset,
1234  nbytes_to_keep);
1235  }
1236 
1237  const auto base_offset = kept_row_start_offset;
1238  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1239  auto update_index = irow_of_blk_to_keep + i;
1240  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1241  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1242  }
1243  }
1244  nbytes_var_data_to_keep += nbytes_to_keep;
1245  maddr_to_vacuum = indices_addr;
1246 
1247  constexpr static auto index_element_size = sizeof(StringOffsetT);
1248  nbytes_to_keep = nrows_to_keep * index_element_size;
1249  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1250  // move curr fixlen row block toward front
1251  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1252  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1253  nbytes_to_keep);
1254  }
1255  irow_of_blk_to_fill += nrows_to_keep;
1256  nbytes_fix_data_to_keep += nbytes_to_keep;
1257  }
1258  irow_of_blk_to_keep = irow_to_vacuum + 1;
1259  }
1260 
1261  // Set expected null padding, last offset, and negative values for null array offsets.
1262  index_array[0] = null_padding;
1263  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1264  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1265  if (!is_varlen_array) {
1266  CHECK(null_array_indexes.empty());
1267  }
1268  for (auto index : null_array_indexes) {
1269  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1270  }
1271  return nbytes_var_data_to_keep;
1272 }
1273 
1275  const TableDescriptor* td,
1276  const int fragment_id,
1277  const std::vector<uint64_t>& frag_offsets,
1278  const Data_Namespace::MemoryLevel memory_level,
1279  UpdelRoll& updel_roll) {
1280  auto fragment_ptr = getFragmentInfo(fragment_id);
1281  auto& fragment = *fragment_ptr;
1282  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1283  const auto ncol = chunks.size();
1284 
1285  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1286 
1287  // parallel delete columns
1288  std::vector<std::future<void>> threads;
1289  auto nrows_to_vacuum = frag_offsets.size();
1290  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1291  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1292 
1293  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1294  auto chunk = chunks[ci];
1295  const auto cd = chunk->getColumnDesc();
1296  const auto& col_type = cd->columnType;
1297  auto data_buffer = chunk->getBuffer();
1298  auto index_buffer = chunk->getIndexBuf();
1299  auto data_addr = data_buffer->getMemoryPtr();
1300  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1301  auto index_array = (StringOffsetT*)indices_addr;
1302  bool is_varlen = col_type.is_varlen_indeed();
1303 
1304  auto fixlen_vacuum =
1305  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1306  size_t nbytes_fix_data_to_keep;
1307  if (nrows_to_keep == 0) {
1308  nbytes_fix_data_to_keep = 0;
1309  } else {
1310  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1311  }
1312 
1313  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1314  data_buffer->setSize(nbytes_fix_data_to_keep);
1315  data_buffer->setUpdated();
1316 
1317  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1318 
1319  auto daddr = data_addr;
1320  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1321  : get_element_size(col_type);
1322  data_buffer->getEncoder()->resetChunkStats();
1323  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1324  if (col_type.is_fixlen_array()) {
1325  auto encoder =
1326  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1327  CHECK(encoder);
1328  encoder->updateMetadata((int8_t*)daddr);
1329  } else if (col_type.is_fp()) {
1330  set_chunk_stats(col_type,
1331  daddr,
1332  update_stats_per_thread[ci].new_values_stats.has_null,
1333  update_stats_per_thread[ci].new_values_stats.min_double,
1334  update_stats_per_thread[ci].new_values_stats.max_double);
1335  } else {
1336  set_chunk_stats(col_type,
1337  daddr,
1338  update_stats_per_thread[ci].new_values_stats.has_null,
1339  update_stats_per_thread[ci].new_values_stats.min_int64t,
1340  update_stats_per_thread[ci].new_values_stats.max_int64t);
1341  }
1342  }
1343  };
1344 
1345  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1346  size_t nbytes_var_data_to_keep;
1347  if (nrows_to_keep == 0) {
1348  nbytes_var_data_to_keep = 0;
1349  } else {
1350  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1351  }
1352 
1353  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1354  data_buffer->setSize(nbytes_var_data_to_keep);
1355  data_buffer->setUpdated();
1356 
1357  index_buffer->setSize(sizeof(*index_array) *
1358  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1359  index_buffer->setUpdated();
1360 
1361  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1362  };
1363 
1364  if (is_varlen) {
1365  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1366  } else {
1367  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1368  }
1369  if (threads.size() >= (size_t)cpu_threads()) {
1370  wait_cleanup_threads(threads);
1371  }
1372  }
1373 
1374  wait_cleanup_threads(threads);
1375 
1376  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1377  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1378  auto chunk = chunks[ci];
1379  auto cd = chunk->getColumnDesc();
1380  if (!cd->columnType.is_fixlen_array()) {
1381  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1382  // stored in seconds. Do the metadata conversion here before updating the chunk
1383  // stats.
1384  if (cd->columnType.is_date_in_days()) {
1385  auto& stats = update_stats_per_thread[ci].new_values_stats;
1386  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1387  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1388  }
1390  fragment,
1391  chunk,
1392  update_stats_per_thread[ci].new_values_stats,
1393  cd->columnType,
1394  updel_roll);
1395  }
1396  }
1397 }
1398 
1399 } // namespace Fragmenter_Namespace
1400 
1402  if (nullptr == catalog) {
1403  return false;
1404  }
1405  const auto td = catalog->getMetadataForTable(logicalTableId);
1406  CHECK(td);
1407  ChunkKey chunk_key{catalog->getDatabaseId(), td->tableId};
1408  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1409 
1410  // Checkpoint all shards. Otherwise, epochs can go out of sync.
1412  auto table_epochs = catalog->getTableEpochs(catalog->getDatabaseId(), logicalTableId);
1413  try {
1414  // `checkpointWithAutoRollback` is not called here because, if a failure occurs,
1415  // `dirtyChunks` has to be cleared before resetting epochs
1416  catalog->checkpoint(logicalTableId);
1417  } catch (...) {
1418  dirty_chunks.clear();
1419  catalog->setTableEpochsLogExceptions(catalog->getDatabaseId(), table_epochs);
1420  throw;
1421  }
1422  }
1423  updateFragmenterAndCleanupChunks();
1424  return true;
1425 }
1426 
1428  CHECK(catalog);
1429  auto db_id = catalog->getDatabaseId();
1430  CHECK(table_descriptor);
1431  auto table_id = table_descriptor->tableId;
1433  CHECK_EQ(table_descriptor->persistenceLevel, Data_Namespace::MemoryLevel::DISK_LEVEL);
1434  try {
1435  catalog->getDataMgr().checkpoint(db_id, table_id, memoryLevel);
1436  } catch (...) {
1437  dirty_chunks.clear();
1438  throw;
1439  }
1440  updateFragmenterAndCleanupChunks();
1441 }
1442 
1444  // for each dirty fragment
1445  for (auto& cm : chunk_metadata_map_per_fragment) {
1446  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1447  }
1448 
1449  // flush gpu dirty chunks if update was not on gpu
1450  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1451  for (const auto& [chunk_key, chunk] : dirty_chunks) {
1454  }
1455  }
1456  dirty_chunks.clear();
1457 }
1458 
1460  if (nullptr == catalog) {
1461  return;
1462  }
1463 
1464  // TODO: needed?
1465  ChunkKey chunk_key{catalog->getDatabaseId(), logicalTableId};
1466  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1467  if (is_varlen_update) {
1468  int databaseId = catalog->getDatabaseId();
1469  auto table_epochs = catalog->getTableEpochs(databaseId, logicalTableId);
1470 
1471  dirty_chunks.clear();
1472  catalog->setTableEpochs(databaseId, table_epochs);
1473  } else {
1474  const auto td = catalog->getMetadataForTable(logicalTableId);
1475  CHECK(td);
1476  if (td->persistenceLevel != memoryLevel) {
1477  for (const auto& [chunk_key, chunk] : dirty_chunks) {
1478  catalog->getDataMgr().free(chunk->getBuffer());
1479  chunk->setBuffer(nullptr);
1480  }
1481  }
1482  }
1483 }
1484 
1485 void UpdelRoll::addDirtyChunk(std::shared_ptr<Chunk_NS::Chunk> chunk,
1486  int32_t fragment_id) {
1487  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1488  CHECK(catalog);
1489  ChunkKey chunk_key{catalog->getDatabaseId(),
1490  chunk->getColumnDesc()->tableId,
1491  chunk->getColumnDesc()->columnId,
1492  fragment_id};
1493  dirty_chunks[chunk_key] = chunk;
1494 }
1495 
1497  const TableDescriptor* td,
1498  Fragmenter_Namespace::FragmentInfo& fragment_info) {
1499  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1500  MetaDataKey key{td, &fragment_info};
1501  if (chunk_metadata_map_per_fragment.count(key) == 0) {
1502  chunk_metadata_map_per_fragment[key] =
1503  fragment_info.getChunkMetadataMapPhysicalCopy();
1504  }
1505  if (num_tuples.count(key) == 0) {
1506  num_tuples[key] = fragment_info.shadowNumTuples;
1507  }
1508 }
1509 
1510 std::shared_ptr<ChunkMetadata> UpdelRoll::getChunkMetadata(
1511  const MetaDataKey& key,
1512  int32_t column_id,
1513  Fragmenter_Namespace::FragmentInfo& fragment_info) {
1514  initializeUnsetMetadata(key.first, fragment_info);
1515  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1516  auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1517  CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1518  auto chunk_metadata_it = metadata_map_it->second.find(column_id);
1519  CHECK(chunk_metadata_it != metadata_map_it->second.end());
1520  return chunk_metadata_it->second;
1521 }
1522 
1524  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1525  auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1526  CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1527  return metadata_map_it->second;
1528 }
1529 
1530 size_t UpdelRoll::getNumTuple(const MetaDataKey& key) const {
1531  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1532  auto it = num_tuples.find(key);
1533  CHECK(it != num_tuples.end());
1534  return it->second;
1535 }
1536 
1537 void UpdelRoll::setNumTuple(const MetaDataKey& key, size_t num_tuple) {
1538  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1539  num_tuples[key] = num_tuple;
1540 }
std::shared_ptr< Chunk_NS::Chunk > chunk
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:57
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
void setNumTuple(const MetaDataKey &key, size_t num_tuple)
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
ChunkMetadataMap getChunkMetadataMapPhysicalCopy() const
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
bool is_varlen_array() const
Definition: sqltypes.h:519
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:41
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:243
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:548
void cancelUpdate()
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
bool is_fp() const
Definition: sqltypes.h:514
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:108
#define UNREACHABLE()
Definition: Logger.h:266
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1087
std::vector< bool > is_default
Definition: Fragmenter.h:75
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:114
bool g_enable_auto_metadata_update
void updateFragmenterAndCleanupChunks()
heavyai::unique_lock< heavyai::shared_mutex > write_lock
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
#define CHECK_GT(x, y)
Definition: Logger.h:234
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
bool is_time() const
Definition: sqltypes.h:516
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:1113
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::shared_lock< T > shared_lock
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:208
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:65
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
bool commitUpdate()
void stageUpdate()
CONSTEXPR DEVICE bool is_null(const T &value)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
bool is_integer() const
Definition: sqltypes.h:512
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
bool is_timeinterval() const
Definition: sqltypes.h:521
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
std::unique_lock< T > unique_lock
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getDatabaseId() const
Definition: Catalog.h:298
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4740
void initializeUnsetMetadata(const TableDescriptor *td, Fragmenter_Namespace::FragmentInfo &fragment_info)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1960
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
bool is_boolean() const
Definition: sqltypes.h:517
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:4754
#define CHECK_LT(x, y)
Definition: Logger.h:232
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:495
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1114
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
Data_Namespace::MemoryLevel persistenceLevel
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:153
int logicalTableId
Definition: UpdelRoll.h:54
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
size_t getNumTuple(const MetaDataKey &key) const
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3584
#define CHECK(condition)
Definition: Logger.h:222
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
void setTableEpochsLogExceptions(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3620
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:510
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
void free(AbstractBuffer *buffer)
Definition: DataMgr.cpp:528
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
int8_t * numbersPtr
Definition: sqltypes.h:226
unencoded array encoder
void set_minmax(T &min, T &max, T const val)
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:513
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3556
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:154
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)