OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <mutex>
19 #include <string>
20 #include <vector>
21 
22 #include <boost/variant.hpp>
23 #include <boost/variant/get.hpp>
24 
25 #include "Catalog/Catalog.h"
29 #include "LockMgr/LockMgr.h"
30 #include "QueryEngine/Execute.h"
31 #include "Shared/DateConverters.h"
33 #include "Shared/thread_count.h"
35 
36 extern bool g_enable_string_functions;
37 
39 
40 namespace Fragmenter_Namespace {
41 
42 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
43  for (auto& t : threads) {
44  t.get();
45  }
46  threads.clear();
47 }
48 
49 inline bool is_integral(const SQLTypeInfo& t) {
50  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
51 }
52 
54 
56  const TableDescriptor* td,
57  const ColumnDescriptor* cd,
58  const int fragment_id,
59  const std::vector<uint64_t>& frag_offsets,
60  const ScalarTargetValue& rhs_value,
61  const SQLTypeInfo& rhs_type,
62  const Data_Namespace::MemoryLevel memory_level,
63  UpdelRoll& updel_roll) {
64  updateColumn(catalog,
65  td,
66  cd,
67  fragment_id,
68  frag_offsets,
69  std::vector<ScalarTargetValue>(1, rhs_value),
70  rhs_type,
71  memory_level,
72  updel_roll);
73 }
74 
75 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
76  const TableDescriptor* td,
77  const FragmentInfo& fragment,
78  const Data_Namespace::MemoryLevel memory_level,
79  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
80  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
81  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
82  ++nc;
83  if (!cd->isVirtualCol) {
84  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
85  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
86  ChunkKey chunk_key{
87  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
88  auto chunk = Chunk_NS::Chunk::getChunk(cd,
89  &catalog->getDataMgr(),
90  chunk_key,
91  memory_level,
92  0,
93  chunk_meta_it->second->numBytes,
94  chunk_meta_it->second->numElements);
95  chunks.push_back(chunk);
96  }
97  }
98  }
99  return chunks.size();
100 }
101 
103  public:
105 
106  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
107 
108  virtual void addDataBlocksToInsertData(
109  Fragmenter_Namespace::InsertData& insertData) = 0;
110 };
111 
112 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
114  using ColumnDataPtr =
115  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
116 
120  const BUFFER_DATA_TYPE* data_buffer_addr_;
121 
122  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
123  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
124  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
125  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
126  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
127  }
128 
129  ~ScalarChunkConverter() override {}
130 
131  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
132  auto buffer_value = data_buffer_addr_[indexInFragment];
133  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
134  column_data_.get()[row] = insert_value;
135  }
136 
138  DataBlockPtr dataBlock;
139  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
140  insertData.data.push_back(dataBlock);
141  insertData.columnIds.push_back(column_descriptor_->columnId);
142  }
143 };
144 
148 
149  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
152 
153  FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
154  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
155  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
158  }
159 
161 
162  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
163  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
164 
166  src_value_ptr);
167 
168  (*column_data_)[row] = ArrayDatum(
169  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
170  }
171 
173  DataBlockPtr dataBlock;
174  dataBlock.arraysPtr = column_data_.get();
175  insertData.data.push_back(dataBlock);
176  insertData.columnIds.push_back(column_descriptor_->columnId);
177  }
178 };
179 
182 
183  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
184  : FixedLenArrayChunkConverter(num_rows, chunk) {
186  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
187  : nullptr);
188  }
189 
190  ~ArrayChunkConverter() override {}
191 
192  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
193  auto startIndex = index_buffer_addr_[indexInFragment];
194  auto endIndex = index_buffer_addr_[indexInFragment + 1];
195  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
196  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
197  (*column_data_)[row] = ArrayDatum(
198  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
199  }
200 };
201 
205 
206  std::unique_ptr<std::vector<std::string>> column_data_;
207  const int8_t* data_buffer_addr_;
209 
210  StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk* chunk)
211  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
212  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
215  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
216  : nullptr);
217  }
218 
219  ~StringChunkConverter() override {}
220 
221  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
222  size_t src_value_size =
223  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
224  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
225  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
226  }
227 
229  DataBlockPtr dataBlock;
230  dataBlock.stringsPtr = column_data_.get();
231  insertData.data.push_back(dataBlock);
232  insertData.columnIds.push_back(column_descriptor_->columnId);
233  }
234 };
235 
236 template <typename BUFFER_DATA_TYPE>
238  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
239 
243  const BUFFER_DATA_TYPE* data_buffer_addr_;
244 
245  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
246  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
248  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
249  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
250  }
251 
252  ~DateChunkConverter() override {}
253 
254  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
255  auto buffer_value = data_buffer_addr_[indexInFragment];
256  auto insert_value = static_cast<int64_t>(buffer_value);
258  }
259 
261  DataBlockPtr dataBlock;
262  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
263  insertData.data.push_back(dataBlock);
264  insertData.columnIds.push_back(column_descriptor_->columnId);
265  }
266 };
267 
269  const Catalog_Namespace::Catalog* catalog,
270  const TableDescriptor* td,
271  const int fragmentId,
272  const std::vector<TargetMetaInfo> sourceMetaInfo,
273  const std::vector<const ColumnDescriptor*> columnDescriptors,
274  const RowDataProvider& sourceDataProvider,
275  const size_t indexOffFragmentOffsetColumn,
276  const Data_Namespace::MemoryLevel memoryLevel,
277  UpdelRoll& updelRoll,
278  Executor* executor) {
279  updelRoll.is_varlen_update = true;
280  updelRoll.catalog = catalog;
281  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
282  updelRoll.memoryLevel = memoryLevel;
283 
284  size_t num_entries = sourceDataProvider.getEntryCount();
285  size_t num_rows = sourceDataProvider.getRowCount();
286 
287  if (0 == num_rows) {
288  // bail out early
289  return;
290  }
291 
293 
294  auto fragment_ptr = getFragmentInfo(fragmentId);
295  auto& fragment = *fragment_ptr;
296  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
297  get_chunks(catalog, td, fragment, memoryLevel, chunks);
298  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
299  columnDescriptors.size());
300  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
301  size_t indexOfDeletedColumn{0};
302  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
303  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
304  auto chunk = chunks[indexOfChunk];
305  const auto chunk_cd = chunk->getColumnDesc();
306 
307  if (chunk_cd->isDeletedCol) {
308  indexOfDeletedColumn = chunk_cd->columnId;
309  deletedChunk = chunk;
310  continue;
311  }
312 
313  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
314  columnDescriptors.end(),
315  [=](const ColumnDescriptor* cd) -> bool {
316  return cd->columnId == chunk_cd->columnId;
317  });
318 
319  if (targetColumnIt != columnDescriptors.end()) {
320  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
321 
322  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
323  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
324 
326  num_rows,
327  sourceDataMetaInfo,
328  targetDescriptor,
329  *catalog,
330  targetDescriptor->columnType,
331  !targetDescriptor->columnType.get_notnull(),
332  sourceDataProvider.getLiteralDictionary(),
334  sourceDataMetaInfo.get_type_info().is_dict_encoded_string()
335  ? executor->getStringDictionaryProxy(
336  sourceDataMetaInfo.get_type_info().getStringDictKey(),
337  executor->getRowSetMemoryOwner(),
338  true)
339  : nullptr,
340  nullptr};
341  auto converter = factory.create(param);
342  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
343 
344  if (targetDescriptor->columnType.is_geometry()) {
345  // geometry columns are composites
346  // need to skip chunks, depending on geo type
347  switch (targetDescriptor->columnType.get_type()) {
348  case kMULTIPOLYGON:
349  indexOfChunk += 5;
350  break;
351  case kPOLYGON:
352  indexOfChunk += 4;
353  break;
354  case kMULTILINESTRING:
355  indexOfChunk += 3;
356  break;
357  case kLINESTRING:
358  case kMULTIPOINT:
359  indexOfChunk += 2;
360  break;
361  case kPOINT:
362  indexOfChunk += 1;
363  break;
364  default:
365  CHECK(false); // not supported
366  }
367  }
368  } else {
369  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
370  std::unique_ptr<ChunkToInsertDataConverter> converter;
371 
372  if (chunk_cd->columnType.is_fixlen_array()) {
373  converter =
374  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
375  } else if (chunk_cd->columnType.is_string()) {
376  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
377  } else if (chunk_cd->columnType.is_geometry()) {
378  // the logical geo column is a string column
379  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
380  } else {
381  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
382  }
383 
384  chunkConverters.push_back(std::move(converter));
385 
386  } else if (chunk_cd->columnType.is_date_in_days()) {
387  /* Q: Why do we need this?
388  A: In variable length updates path we move the chunk content of column
389  without decoding. Since it again passes through DateDaysEncoder
390  the expected value should be in seconds, but here it will be in days.
391  Therefore, using DateChunkConverter chunk values are being scaled to
392  seconds which then ultimately encoded in days in DateDaysEncoder.
393  */
394  std::unique_ptr<ChunkToInsertDataConverter> converter;
395  const size_t physical_size = chunk_cd->columnType.get_size();
396  if (physical_size == 2) {
397  converter =
398  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
399  } else if (physical_size == 4) {
400  converter =
401  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
402  } else {
403  CHECK(false);
404  }
405  chunkConverters.push_back(std::move(converter));
406  } else {
407  std::unique_ptr<ChunkToInsertDataConverter> converter;
408  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
409  int logical_size = logical_type.get_size();
410  int physical_size = chunk_cd->columnType.get_size();
411 
412  if (logical_type.is_string()) {
413  // for dicts -> logical = physical
414  logical_size = physical_size;
415  }
416 
417  if (8 == physical_size) {
418  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
419  num_rows, chunk.get());
420  } else if (4 == physical_size) {
421  if (8 == logical_size) {
422  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
423  num_rows, chunk.get());
424  } else {
425  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
426  num_rows, chunk.get());
427  }
428  } else if (2 == chunk_cd->columnType.get_size()) {
429  if (8 == logical_size) {
430  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
431  num_rows, chunk.get());
432  } else if (4 == logical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
434  num_rows, chunk.get());
435  } else {
436  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
437  num_rows, chunk.get());
438  }
439  } else if (1 == chunk_cd->columnType.get_size()) {
440  if (8 == logical_size) {
441  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
442  num_rows, chunk.get());
443  } else if (4 == logical_size) {
444  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
445  num_rows, chunk.get());
446  } else if (2 == logical_size) {
447  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
448  num_rows, chunk.get());
449  } else {
450  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
451  num_rows, chunk.get());
452  }
453  } else {
454  CHECK(false); // unknown
455  }
456 
457  chunkConverters.push_back(std::move(converter));
458  }
459  }
460  }
461 
462  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
463  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
464 
465  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
466  bool* deletedChunkBuffer =
467  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
468 
469  std::atomic<size_t> row_idx{0};
470 
471  auto row_converter = [&sourceDataProvider,
472  &sourceDataConverters,
473  &indexOffFragmentOffsetColumn,
474  &chunkConverters,
475  &deletedChunkBuffer,
476  &row_idx](size_t indexOfEntry) -> void {
477  // convert the source data
478  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
479  if (row.empty()) {
480  return;
481  }
482 
483  size_t indexOfRow = row_idx.fetch_add(1);
484 
485  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
486  if (sourceDataConverters[col]) {
487  const auto& mapd_variant = row[col];
488  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
489  }
490  }
491 
492  auto scalar = checked_get(
493  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
494  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
495 
496  // convert the remaining chunks
497  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
498  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
499  }
500 
501  // now mark the row as deleted
502  deletedChunkBuffer[indexInChunkBuffer] = true;
503  };
504 
505  bool can_go_parallel = num_rows > 20000;
506 
507  if (can_go_parallel) {
508  const size_t num_worker_threads = cpu_threads();
509  std::vector<std::future<void>> worker_threads;
510  for (size_t i = 0,
511  start_entry = 0,
512  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
513  i < num_worker_threads && start_entry < num_entries;
514  ++i, start_entry += stride) {
515  const auto end_entry = std::min(start_entry + stride, num_rows);
516  worker_threads.push_back(std::async(
518  [&row_converter](const size_t start, const size_t end) {
519  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
520  row_converter(indexOfRow);
521  }
522  },
523  start_entry,
524  end_entry));
525  }
526 
527  for (auto& child : worker_threads) {
528  child.wait();
529  }
530 
531  } else {
532  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
533  row_converter(entryIdx);
534  }
535  }
536 
538  insert_data.databaseId = catalog->getCurrentDB().dbId;
539  insert_data.tableId = td->tableId;
540 
541  for (size_t i = 0; i < chunkConverters.size(); i++) {
542  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
543  continue;
544  }
545 
546  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
547  if (sourceDataConverters[i]) {
548  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
549  }
550  continue;
551  }
552 
553  insert_data.numRows = num_rows;
554  insert_data.is_default.resize(insert_data.columnIds.size(), false);
555  insertDataNoCheckpoint(insert_data);
556 
557  // update metdata for deleted chunk as we are doing special handling
558  auto chunkMetadata =
559  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
560  chunkMetadata->chunkStats.max.boolval = 1;
561 
562  // Im not completely sure that we need to do this in fragmented and on the buffer
563  // but leaving this alone for now
564  if (!deletedChunk->getBuffer()->hasEncoder()) {
565  deletedChunk->initEncoder();
566  }
567  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
568 
569  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
570  // An append to the same fragment will increase shadowNumTuples.
571  // Update NumElems in this case. Otherwise, use existing NumElems.
572  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
573  }
574  deletedChunk->getBuffer()->setUpdated();
575 }
576 
577 namespace {
578 inline void update_metadata(SQLTypeInfo const& ti,
580  int64_t const updated_val,
581  int64_t const old_val,
582  NullSentinelSupplier s = NullSentinelSupplier()) {
583  if (ti.get_notnull()) {
585  update_stats.new_values_stats.max_int64t,
586  updated_val);
588  update_stats.old_values_stats.max_int64t,
589  old_val);
590  } else {
592  update_stats.new_values_stats.max_int64t,
593  update_stats.new_values_stats.has_null,
594  updated_val,
595  s(ti, updated_val));
597  update_stats.old_values_stats.max_int64t,
598  update_stats.old_values_stats.has_null,
599  old_val,
600  s(ti, old_val));
601  }
602 }
603 
604 inline void update_metadata(SQLTypeInfo const& ti,
605  ChunkUpdateStats& update_stats,
606  double const updated_val,
607  double const old_val,
608  NullSentinelSupplier s = NullSentinelSupplier()) {
609  if (ti.get_notnull()) {
611  update_stats.new_values_stats.max_double,
612  updated_val);
614  update_stats.old_values_stats.max_double,
615  old_val);
616  } else {
618  update_stats.new_values_stats.max_double,
619  update_stats.new_values_stats.has_null,
620  updated_val,
621  s(ti, updated_val));
623  update_stats.old_values_stats.max_double,
624  update_stats.old_values_stats.has_null,
625  old_val,
626  s(ti, old_val));
627  }
628 }
629 
630 inline void update_metadata(UpdateValuesStats& agg_stats,
631  const UpdateValuesStats& new_stats) {
632  agg_stats.has_null = agg_stats.has_null || new_stats.has_null;
633  agg_stats.max_double = std::max<double>(agg_stats.max_double, new_stats.max_double);
634  agg_stats.min_double = std::min<double>(agg_stats.min_double, new_stats.min_double);
635  agg_stats.max_int64t = std::max<int64_t>(agg_stats.max_int64t, new_stats.max_int64t);
636  agg_stats.min_int64t = std::min<int64_t>(agg_stats.min_int64t, new_stats.min_int64t);
637 }
638 } // namespace
639 
640 std::optional<ChunkUpdateStats> InsertOrderFragmenter::updateColumn(
641  const Catalog_Namespace::Catalog* catalog,
642  const TableDescriptor* td,
643  const ColumnDescriptor* cd,
644  const int fragment_id,
645  const std::vector<uint64_t>& frag_offsets,
646  const std::vector<ScalarTargetValue>& rhs_values,
647  const SQLTypeInfo& rhs_type,
648  const Data_Namespace::MemoryLevel memory_level,
649  UpdelRoll& updel_roll) {
650  updel_roll.catalog = catalog;
651  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
652  updel_roll.memoryLevel = memory_level;
653 
654  const size_t ncore = cpu_threads();
655  const auto nrow = frag_offsets.size();
656  const auto n_rhs_values = rhs_values.size();
657  if (0 == nrow) {
658  return {};
659  }
660  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
661 
662  auto fragment_ptr = getFragmentInfo(fragment_id);
663  auto& fragment = *fragment_ptr;
664  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
665  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
666  ChunkKey chunk_key{
667  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
668  auto chunk = Chunk_NS::Chunk::getChunk(cd,
669  &catalog->getDataMgr(),
670  chunk_key,
672  0,
673  chunk_meta_it->second->numBytes,
674  chunk_meta_it->second->numElements);
675 
676  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
677 
678  // parallel update elements
679  std::vector<std::future<void>> threads;
680 
681  const auto segsz = (nrow + ncore - 1) / ncore;
682  auto dbuf = chunk->getBuffer();
683  auto dbuf_addr = dbuf->getMemoryPtr();
684  dbuf->setUpdated();
685  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
686  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
687  threads.emplace_back(std::async(
688  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
689  SQLTypeInfo lhs_type = cd->columnType;
690 
691  // !! not sure if this is a undocumented convention or a bug, but for a sharded
692  // table the dictionary id of a encoded string column is not specified by
693  // comp_param in physical table but somehow in logical table :) comp_param in
694  // physical table is always 0, so need to adapt accordingly...
695  auto cdl = (shard_ < 0)
696  ? cd
697  : catalog->getMetadataForColumn(
698  catalog->getLogicalTableId(td->tableId), cd->columnId);
699  CHECK(cdl);
700  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
701  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
702  lhs_type, &decimalOverflowValidator);
703  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
704  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
705  lhs_type, &dateDaysOverflowValidator);
706 
707  StringDictionary* stringDict{nullptr};
708  if (lhs_type.is_string()) {
709  CHECK(kENCODING_DICT == lhs_type.get_compression());
710  auto dictDesc = const_cast<DictDescriptor*>(
711  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
712  CHECK(dictDesc);
713  stringDict = dictDesc->stringDict.get();
714  CHECK(stringDict);
715  }
716 
717  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
718  const auto roffs = frag_offsets[r];
719  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
720  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
721  ScalarTargetValue sv2;
722 
723  // Subtle here is on the two cases of string-to-string assignments, when
724  // upstream passes RHS string as a string index instead of a preferred "real
725  // string".
726  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
727  // index
728  // in this layer, so if upstream passes a str idx here, an
729  // exception is thrown.
730  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
731  // str idx.
732  if (rhs_type.is_string()) {
733  if (const auto vp = boost::get<int64_t>(sv)) {
734  auto dictDesc = const_cast<DictDescriptor*>(
735  catalog->getMetadataForDict(rhs_type.get_comp_param()));
736  if (nullptr == dictDesc) {
737  throw std::runtime_error(
738  "UPDATE does not support cast from string literal to string "
739  "column.");
740  }
741  auto stringDict = dictDesc->stringDict.get();
742  CHECK(stringDict);
743  sv2 = NullableString(stringDict->getString(*vp));
744  sv = &sv2;
745  }
746  }
747 
748  if (const auto vp = boost::get<int64_t>(sv)) {
749  auto v = *vp;
750  if (lhs_type.is_string()) {
751  throw std::runtime_error("UPDATE does not support cast to string.");
752  }
753  int64_t old_val;
754  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
755  // Handle special case where date column with date in days encoding stores
756  // metadata in epoch seconds.
757  if (lhs_type.is_date_in_days()) {
759  }
760  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
761  if (lhs_type.is_decimal()) {
762  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
763  int64_t decimal_val;
764  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
765  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
766  lhs_type.get_notnull() == false)
767  ? v
768  : decimal_val;
770  lhs_type, update_stats_per_thread[c], target_value, old_val);
771  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
772  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
773  if (positive_v_and_negative_d || negative_v_and_positive_d) {
774  throw std::runtime_error(
775  "Data conversion overflow on " + std::to_string(v) +
776  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
777  std::to_string(rhs_type.get_scale()) + ") to (" +
778  std::to_string(lhs_type.get_dimension()) + ", " +
779  std::to_string(lhs_type.get_scale()) + ")");
780  }
781  } else if (is_integral(lhs_type)) {
782  if (lhs_type.is_date_in_days()) {
783  // Store meta values in seconds
784  if (lhs_type.get_size() == 2) {
785  nullAwareDateOverflowValidator.validate<int16_t>(v);
786  } else {
787  nullAwareDateOverflowValidator.validate<int32_t>(v);
788  }
789  int64_t days;
790  get_scalar<int64_t>(data_ptr, lhs_type, days);
791  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
792  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
793  lhs_type.get_notnull() == false)
794  ? NullSentinelSupplier()(lhs_type, v)
795  : seconds;
797  lhs_type, update_stats_per_thread[c], target_value, old_val);
798  } else {
799  int64_t target_value;
800  if (rhs_type.is_decimal()) {
801  target_value = round(decimal_to_double(rhs_type, v));
802  } else {
803  target_value = v;
804  }
806  lhs_type, update_stats_per_thread[c], target_value, old_val);
807  }
808  } else {
809  if (rhs_type.is_decimal()) {
810  update_metadata(lhs_type,
811  update_stats_per_thread[c],
812  decimal_to_double(rhs_type, v),
813  double(old_val));
814  } else {
815  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
816  }
817  }
818  } else if (const auto vp = boost::get<double>(sv)) {
819  auto v = *vp;
820  if (lhs_type.is_string()) {
821  throw std::runtime_error("UPDATE does not support cast to string.");
822  }
823  double old_val;
824  get_scalar<double>(data_ptr, lhs_type, old_val);
825  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
826  if (lhs_type.is_integer()) {
828  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
829  } else if (lhs_type.is_fp()) {
831  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
832  } else {
833  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
834  "LHS with a floating RHS.";
835  }
836  } else if (const auto vp = boost::get<float>(sv)) {
837  auto v = *vp;
838  if (lhs_type.is_string()) {
839  throw std::runtime_error("UPDATE does not support cast to string.");
840  }
841  float old_val;
842  get_scalar<float>(data_ptr, lhs_type, old_val);
843  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
844  if (lhs_type.is_integer()) {
846  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
847  } else {
848  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
849  }
850  } else if (const auto vp = boost::get<NullableString>(sv)) {
851  const auto s = boost::get<std::string>(vp);
852  const auto sval = s ? *s : std::string("");
853  if (lhs_type.is_string()) {
854  decltype(stringDict->getOrAdd(sval)) sidx;
855  {
856  std::unique_lock<std::mutex> lock(temp_mutex_);
857  sidx = stringDict->getOrAdd(sval);
858  }
859  int64_t old_val;
860  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
861  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
863  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
864  } else if (sval.size() > 0) {
865  auto dval = std::atof(sval.data());
866  if (lhs_type.is_boolean()) {
867  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
868  } else if (lhs_type.is_time()) {
869  throw std::runtime_error(
870  "Date/Time/Timestamp update not supported through translated "
871  "string path.");
872  }
873  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
874  double old_val;
875  get_scalar<double>(data_ptr, lhs_type, old_val);
876  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
878  lhs_type, update_stats_per_thread[c], double(dval), old_val);
879  } else {
880  int64_t old_val;
881  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
882  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
884  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
885  }
886  } else {
887  put_null(data_ptr, lhs_type, cd->columnName);
888  update_stats_per_thread[c].new_values_stats.has_null = true;
889  }
890  } else {
891  CHECK(false);
892  }
893  }
894  }));
895  if (threads.size() >= (size_t)cpu_threads()) {
896  wait_cleanup_threads(threads);
897  }
898  }
899  wait_cleanup_threads(threads);
900 
901  // for unit test
903  if (cd->isDeletedCol) {
904  const auto deleted_offsets = getVacuumOffsets(chunk);
905  if (deleted_offsets.size() > 0) {
906  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
907  return {};
908  }
909  }
910  }
912  for (size_t c = 0; c < ncore; ++c) {
913  update_metadata(update_stats.new_values_stats,
914  update_stats_per_thread[c].new_values_stats);
915  update_metadata(update_stats.old_values_stats,
916  update_stats_per_thread[c].old_values_stats);
917  }
918 
919  CHECK_GT(fragment.shadowNumTuples, size_t(0));
921  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
922  update_stats.updated_rows_count = nrow;
923  update_stats.fragment_rows_count = fragment.shadowNumTuples;
924  update_stats.chunk = chunk;
925  return update_stats;
926 }
927 
929  const ColumnDescriptor* cd,
930  FragmentInfo& fragment,
931  std::shared_ptr<Chunk_NS::Chunk> chunk,
932  const UpdateValuesStats& new_values_stats,
933  const SQLTypeInfo& rhs_type,
934  UpdelRoll& updel_roll) {
936  auto buffer = chunk->getBuffer();
937  const auto& lhs_type = cd->columnType;
938 
939  auto encoder = buffer->getEncoder();
940  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
941  static_assert(std::is_same<decltype(min), decltype(max)>::value,
942  "Type mismatch on min/max");
943  if (has_null) {
944  encoder->updateStats(decltype(min)(), true);
945  }
946  if (max < min) {
947  return;
948  }
949  encoder->updateStats(min, false);
950  encoder->updateStats(max, false);
951  };
952 
953  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
954  update_stats(new_values_stats.min_int64t,
955  new_values_stats.max_int64t,
956  new_values_stats.has_null);
957  } else if (lhs_type.is_fp()) {
958  update_stats(new_values_stats.min_double,
959  new_values_stats.max_double,
960  new_values_stats.has_null);
961  } else if (lhs_type.is_decimal()) {
962  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
963  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
964  new_values_stats.has_null);
965  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
966  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
967  update_stats(new_values_stats.min_int64t,
968  new_values_stats.max_int64t,
969  new_values_stats.has_null);
970  }
971  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
972  auto chunk_metadata =
973  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
974  buffer->getEncoder()->getMetadata(chunk_metadata);
975 }
976 
978  const MetaDataKey& key,
979  UpdelRoll& updel_roll) {
981  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
982  auto& fragmentInfo = *key.second;
983  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
984  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
985  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
986  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
987 }
988 
990  const TableDescriptor* td,
991  const FragmentInfo& fragment,
992  const Data_Namespace::MemoryLevel memory_level) {
993  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
994  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
995  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
996  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
997  ++ncol;
998  if (!cd->isVirtualCol) {
999  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1000  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1001  ChunkKey chunk_key{
1002  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1003  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1004  &catalog_->getDataMgr(),
1005  chunk_key,
1006  memory_level,
1007  0,
1008  chunk_meta_it->second->numBytes,
1009  chunk_meta_it->second->numElements);
1010  chunks.push_back(chunk);
1011  }
1012  }
1013  }
1014  return chunks;
1015 }
1016 
1017 // get a sorted vector of offsets of rows to vacuum
1018 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1019  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1020  const auto data_buffer = chunk->getBuffer();
1021  const auto data_addr = data_buffer->getMemoryPtr();
1022  const size_t nrows_in_chunk = data_buffer->size();
1023  const size_t ncore = cpu_threads();
1024  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1025  std::vector<std::vector<uint64_t>> deleted_offsets;
1026  deleted_offsets.resize(ncore);
1027  std::vector<std::future<void>> threads;
1028  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1029  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1030  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1031  const auto ithread = rbegin / segsz;
1032  CHECK(ithread < deleted_offsets.size());
1033  deleted_offsets[ithread].reserve(segsz);
1034  for (size_t r = rbegin; r < rend; ++r) {
1035  if (data_addr[r]) {
1036  deleted_offsets[ithread].push_back(r);
1037  }
1038  }
1039  }));
1040  }
1041  wait_cleanup_threads(threads);
1042  std::vector<uint64_t> all_deleted_offsets;
1043  for (size_t i = 0; i < ncore; ++i) {
1044  all_deleted_offsets.insert(
1045  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1046  }
1047  return all_deleted_offsets;
1048 }
1049 
1050 template <typename T>
1051 static void set_chunk_stats(const SQLTypeInfo& col_type,
1052  int8_t* data_addr,
1053  bool& has_null,
1054  T& min,
1055  T& max) {
1056  T v;
1057  const auto can_be_null = !col_type.get_notnull();
1058  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1059  if (is_null) {
1060  has_null = has_null || (can_be_null && is_null);
1061  } else {
1062  set_minmax(min, max, v);
1063  }
1064 }
1065 
1067  FragmentInfo& fragment,
1068  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1069  const size_t nrows_to_keep,
1070  UpdelRoll& updel_roll) {
1071  auto cd = chunk->getColumnDesc();
1072  auto td = catalog->getMetadataForTable(cd->tableId);
1073  auto data_buffer = chunk->getBuffer();
1074  auto chunkMetadata =
1075  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
1076  chunkMetadata->numElements = nrows_to_keep;
1077  chunkMetadata->numBytes = data_buffer->size();
1078  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
1079 }
1080 
1082  const FragmentInfo& fragment,
1083  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1084  const std::vector<uint64_t>& frag_offsets) {
1085  const auto cd = chunk->getColumnDesc();
1086  const auto& col_type = cd->columnType;
1087  auto data_buffer = chunk->getBuffer();
1088  auto data_addr = data_buffer->getMemoryPtr();
1089  auto element_size =
1090  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1091  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1092  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1093  size_t nbytes_fix_data_to_keep = 0;
1094  auto nrows_to_vacuum = frag_offsets.size();
1095  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1096  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1097  auto is_last_one = irow == nrows_to_vacuum;
1098  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1099  auto maddr_to_vacuum = data_addr;
1100  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1101  if (nrows_to_keep > 0) {
1102  auto nbytes_to_keep = nrows_to_keep * element_size;
1103  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1104  // move curr fixlen row block toward front
1105  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1106  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1107  nbytes_to_keep);
1108  }
1109  irow_of_blk_to_fill += nrows_to_keep;
1110  nbytes_fix_data_to_keep += nbytes_to_keep;
1111  }
1112  irow_of_blk_to_keep = irow_to_vacuum + 1;
1113  }
1114  return nbytes_fix_data_to_keep;
1115 }
1116 
1117 // Gets the initial padding required for the chunk buffer. For variable length array
1118 // columns, if the first element after vacuuming is going to be a null array, a padding
1119 // with a value that is greater than 0 is expected.
1120 size_t get_null_padding(bool is_varlen_array,
1121  const std::vector<uint64_t>& frag_offsets,
1122  const StringOffsetT* index_array,
1123  size_t fragment_row_count) {
1124  if (is_varlen_array) {
1125  size_t first_non_deleted_row_index{0};
1126  for (auto deleted_offset : frag_offsets) {
1127  if (first_non_deleted_row_index < deleted_offset) {
1128  break;
1129  } else {
1130  first_non_deleted_row_index++;
1131  }
1132  }
1133  CHECK_LT(first_non_deleted_row_index, fragment_row_count);
1134  if (first_non_deleted_row_index == 0) {
1135  // If the first row in the fragment is not deleted, then the first offset in the
1136  // index buffer/array already contains expected padding.
1137  return index_array[0];
1138  } else {
1139  // If the first non-deleted element is a null array (indentified by a negative
1140  // offset), get a padding value for the chunk buffer.
1141  if (index_array[first_non_deleted_row_index + 1] < 0) {
1142  size_t first_non_zero_offset{0};
1143  for (size_t i = 0; i <= first_non_deleted_row_index; i++) {
1144  if (index_array[i] != 0) {
1145  first_non_zero_offset = index_array[i];
1146  break;
1147  }
1148  }
1149  CHECK_GT(first_non_zero_offset, static_cast<size_t>(0));
1151  first_non_zero_offset);
1152  } else {
1153  return 0;
1154  }
1155  }
1156  } else {
1157  return 0;
1158  }
1159 }
1160 
1161 // Gets the indexes of variable length null arrays in the chunk after vacuuming.
1162 std::set<size_t> get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info,
1163  const std::vector<uint64_t>& frag_offsets,
1164  const StringOffsetT* index_array,
1165  size_t fragment_row_count) {
1166  std::set<size_t> null_array_indexes;
1167  if (sql_type_info.is_varlen_array() && !sql_type_info.get_notnull()) {
1168  size_t frag_offset_index{0};
1169  size_t vacuum_offset{0};
1170  for (size_t i = 0; i < fragment_row_count; i++) {
1171  if (frag_offset_index < frag_offsets.size() &&
1172  i == frag_offsets[frag_offset_index]) {
1173  frag_offset_index++;
1174  vacuum_offset++;
1175  } else if (index_array[i + 1] < 0) {
1176  null_array_indexes.emplace(i - vacuum_offset);
1177  }
1178  }
1179  }
1180  return null_array_indexes;
1181 }
1182 
1183 StringOffsetT get_buffer_offset(bool is_varlen_array,
1184  const StringOffsetT* index_array,
1185  size_t index) {
1186  auto offset = index_array[index];
1187  if (offset < 0) {
1188  // Variable length arrays encode null arrays as negative offsets
1189  CHECK(is_varlen_array);
1190  offset = -offset;
1191  }
1192  return offset;
1193 }
1194 
1196  const FragmentInfo& fragment,
1197  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1198  const std::vector<uint64_t>& frag_offsets) {
1199  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1200  auto data_buffer = chunk->getBuffer();
1201  CHECK(data_buffer);
1202  auto index_buffer = chunk->getIndexBuf();
1203  CHECK(index_buffer);
1204  auto data_addr = data_buffer->getMemoryPtr();
1205  auto indices_addr = index_buffer->getMemoryPtr();
1206  CHECK(indices_addr);
1207  auto index_array = (StringOffsetT*)indices_addr;
1208  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1209  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1210  size_t nbytes_fix_data_to_keep = 0;
1211  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1212  size_t null_padding =
1213  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1214  size_t nbytes_var_data_to_keep = null_padding;
1215  auto null_array_indexes = get_var_len_null_array_indexes(
1216  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1217  auto nrows_to_vacuum = frag_offsets.size();
1218  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1219  auto is_last_one = irow == nrows_to_vacuum;
1220  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1221  auto maddr_to_vacuum = data_addr;
1222  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1223  if (nrows_to_keep > 0) {
1224  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1225  auto deleted_row_start_offset =
1226  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1227  auto kept_row_start_offset =
1228  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1229  auto nbytes_to_keep =
1230  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1231  kept_row_start_offset;
1232  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1233  if (nbytes_to_keep > 0) {
1234  CHECK(data_addr);
1235  // move curr varlen row block toward front
1236  memmove(data_addr + ibyte_var_data_to_keep,
1237  data_addr + kept_row_start_offset,
1238  nbytes_to_keep);
1239  }
1240 
1241  const auto base_offset = kept_row_start_offset;
1242  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1243  auto update_index = irow_of_blk_to_keep + i;
1244  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1245  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1246  }
1247  }
1248  nbytes_var_data_to_keep += nbytes_to_keep;
1249  maddr_to_vacuum = indices_addr;
1250 
1251  constexpr static auto index_element_size = sizeof(StringOffsetT);
1252  nbytes_to_keep = nrows_to_keep * index_element_size;
1253  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1254  // move curr fixlen row block toward front
1255  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1256  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1257  nbytes_to_keep);
1258  }
1259  irow_of_blk_to_fill += nrows_to_keep;
1260  nbytes_fix_data_to_keep += nbytes_to_keep;
1261  }
1262  irow_of_blk_to_keep = irow_to_vacuum + 1;
1263  }
1264 
1265  // Set expected null padding, last offset, and negative values for null array offsets.
1266  index_array[0] = null_padding;
1267  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1268  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1269  if (!is_varlen_array) {
1270  CHECK(null_array_indexes.empty());
1271  }
1272  for (auto index : null_array_indexes) {
1273  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1274  }
1275  return nbytes_var_data_to_keep;
1276 }
1277 
1279  const TableDescriptor* td,
1280  const int fragment_id,
1281  const std::vector<uint64_t>& frag_offsets,
1282  const Data_Namespace::MemoryLevel memory_level,
1283  UpdelRoll& updel_roll) {
1284  auto fragment_ptr = getFragmentInfo(fragment_id);
1285  auto& fragment = *fragment_ptr;
1286  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1287  const auto ncol = chunks.size();
1288 
1289  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1290 
1291  // parallel delete columns
1292  std::vector<std::future<void>> threads;
1293  auto nrows_to_vacuum = frag_offsets.size();
1294  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1295  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1296 
1297  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1298  auto chunk = chunks[ci];
1299  const auto cd = chunk->getColumnDesc();
1300  const auto& col_type = cd->columnType;
1301  auto data_buffer = chunk->getBuffer();
1302  auto index_buffer = chunk->getIndexBuf();
1303  auto data_addr = data_buffer->getMemoryPtr();
1304  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1305  auto index_array = (StringOffsetT*)indices_addr;
1306  bool is_varlen = col_type.is_varlen_indeed();
1307 
1308  auto fixlen_vacuum =
1309  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1310  size_t nbytes_fix_data_to_keep;
1311  if (nrows_to_keep == 0) {
1312  nbytes_fix_data_to_keep = 0;
1313  } else {
1314  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1315  }
1316 
1317  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1318  data_buffer->setSize(nbytes_fix_data_to_keep);
1319  data_buffer->setUpdated();
1320 
1321  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1322 
1323  auto daddr = data_addr;
1324  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1325  : get_element_size(col_type);
1326  data_buffer->getEncoder()->resetChunkStats();
1327  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1328  if (col_type.is_fixlen_array()) {
1329  auto encoder =
1330  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1331  CHECK(encoder);
1332  encoder->updateMetadata((int8_t*)daddr);
1333  } else if (col_type.is_fp()) {
1334  set_chunk_stats(col_type,
1335  daddr,
1336  update_stats_per_thread[ci].new_values_stats.has_null,
1337  update_stats_per_thread[ci].new_values_stats.min_double,
1338  update_stats_per_thread[ci].new_values_stats.max_double);
1339  } else {
1340  set_chunk_stats(col_type,
1341  daddr,
1342  update_stats_per_thread[ci].new_values_stats.has_null,
1343  update_stats_per_thread[ci].new_values_stats.min_int64t,
1344  update_stats_per_thread[ci].new_values_stats.max_int64t);
1345  }
1346  }
1347  };
1348 
1349  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1350  size_t nbytes_var_data_to_keep;
1351  if (nrows_to_keep == 0) {
1352  nbytes_var_data_to_keep = 0;
1353  } else {
1354  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1355  }
1356 
1357  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1358  data_buffer->setSize(nbytes_var_data_to_keep);
1359  data_buffer->setUpdated();
1360 
1361  index_buffer->setSize(sizeof(*index_array) *
1362  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1363  index_buffer->setUpdated();
1364 
1365  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1366  };
1367 
1368  if (is_varlen) {
1369  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1370  } else {
1371  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1372  }
1373  if (threads.size() >= (size_t)cpu_threads()) {
1374  wait_cleanup_threads(threads);
1375  }
1376  }
1377 
1378  wait_cleanup_threads(threads);
1379 
1380  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1381  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1382  auto chunk = chunks[ci];
1383  auto cd = chunk->getColumnDesc();
1384  if (!cd->columnType.is_fixlen_array()) {
1385  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1386  // stored in seconds. Do the metadata conversion here before updating the chunk
1387  // stats.
1388  if (cd->columnType.is_date_in_days()) {
1389  auto& stats = update_stats_per_thread[ci].new_values_stats;
1390  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1391  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1392  }
1394  fragment,
1395  chunk,
1396  update_stats_per_thread[ci].new_values_stats,
1397  cd->columnType,
1398  updel_roll);
1399  }
1400  }
1401 }
1402 
1403 } // namespace Fragmenter_Namespace
1404 
1406  if (nullptr == catalog) {
1407  return false;
1408  }
1409  const auto td = catalog->getMetadataForTable(logicalTableId);
1410  CHECK(td);
1411  ChunkKey chunk_key{catalog->getDatabaseId(), td->tableId};
1412  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1413 
1414  // Checkpoint all shards. Otherwise, epochs can go out of sync.
1416  auto table_epochs = catalog->getTableEpochs(catalog->getDatabaseId(), logicalTableId);
1417  try {
1418  // `checkpointWithAutoRollback` is not called here because, if a failure occurs,
1419  // `dirtyChunks` has to be cleared before resetting epochs
1420  catalog->checkpoint(logicalTableId);
1421  } catch (...) {
1422  dirty_chunks.clear();
1423  catalog->setTableEpochsLogExceptions(catalog->getDatabaseId(), table_epochs);
1424  throw;
1425  }
1426  }
1427  updateFragmenterAndCleanupChunks();
1428  return true;
1429 }
1430 
1432  CHECK(catalog);
1433  auto db_id = catalog->getDatabaseId();
1434  CHECK(table_descriptor);
1435  auto table_id = table_descriptor->tableId;
1437  CHECK_EQ(table_descriptor->persistenceLevel, Data_Namespace::MemoryLevel::DISK_LEVEL);
1438  try {
1439  catalog->getDataMgr().checkpoint(db_id, table_id, memoryLevel);
1440  } catch (...) {
1441  dirty_chunks.clear();
1442  throw;
1443  }
1444  updateFragmenterAndCleanupChunks();
1445 }
1446 
1448  // for each dirty fragment
1449  for (auto& cm : chunk_metadata_map_per_fragment) {
1450  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1451  }
1452 
1453  // flush gpu dirty chunks if update was not on gpu
1454  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1455  for (const auto& [chunk_key, chunk] : dirty_chunks) {
1458  }
1459  }
1460  dirty_chunks.clear();
1461 }
1462 
1464  if (nullptr == catalog) {
1465  return;
1466  }
1467 
1468  // TODO: needed?
1469  ChunkKey chunk_key{catalog->getDatabaseId(), logicalTableId};
1470  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1471  if (is_varlen_update) {
1472  int databaseId = catalog->getDatabaseId();
1473  auto table_epochs = catalog->getTableEpochs(databaseId, logicalTableId);
1474 
1475  dirty_chunks.clear();
1476  catalog->setTableEpochs(databaseId, table_epochs);
1477  } else {
1478  const auto td = catalog->getMetadataForTable(logicalTableId);
1479  CHECK(td);
1480  if (td->persistenceLevel != memoryLevel) {
1481  for (const auto& [chunk_key, chunk] : dirty_chunks) {
1482  catalog->getDataMgr().free(chunk->getBuffer());
1483  chunk->setBuffer(nullptr);
1484  }
1485  }
1486  }
1487 }
1488 
1489 void UpdelRoll::addDirtyChunk(std::shared_ptr<Chunk_NS::Chunk> chunk,
1490  int32_t fragment_id) {
1491  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1492  CHECK(catalog);
1493  ChunkKey chunk_key{catalog->getDatabaseId(),
1494  chunk->getColumnDesc()->tableId,
1495  chunk->getColumnDesc()->columnId,
1496  fragment_id};
1497  dirty_chunks[chunk_key] = chunk;
1498 }
1499 
1501  const TableDescriptor* td,
1502  Fragmenter_Namespace::FragmentInfo& fragment_info) {
1503  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1504  MetaDataKey key{td, &fragment_info};
1505  if (chunk_metadata_map_per_fragment.count(key) == 0) {
1506  chunk_metadata_map_per_fragment[key] =
1507  fragment_info.getChunkMetadataMapPhysicalCopy();
1508  }
1509  if (num_tuples.count(key) == 0) {
1510  num_tuples[key] = fragment_info.shadowNumTuples;
1511  }
1512 }
1513 
1514 std::shared_ptr<ChunkMetadata> UpdelRoll::getChunkMetadata(
1515  const MetaDataKey& key,
1516  int32_t column_id,
1517  Fragmenter_Namespace::FragmentInfo& fragment_info) {
1518  initializeUnsetMetadata(key.first, fragment_info);
1519  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1520  auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1521  CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1522  auto chunk_metadata_it = metadata_map_it->second.find(column_id);
1523  CHECK(chunk_metadata_it != metadata_map_it->second.end());
1524  return chunk_metadata_it->second;
1525 }
1526 
1528  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1529  auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1530  CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1531  return metadata_map_it->second;
1532 }
1533 
1534 size_t UpdelRoll::getNumTuple(const MetaDataKey& key) const {
1535  heavyai::shared_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1536  auto it = num_tuples.find(key);
1537  CHECK(it != num_tuples.end());
1538  return it->second;
1539 }
1540 
1541 void UpdelRoll::setNumTuple(const MetaDataKey& key, size_t num_tuple) {
1542  heavyai::unique_lock<heavyai::shared_mutex> lock(chunk_update_tracker_mutex);
1543  num_tuples[key] = num_tuple;
1544 }
std::shared_ptr< Chunk_NS::Chunk > chunk
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:55
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:57
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
void setNumTuple(const MetaDataKey &key, size_t num_tuple)
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
ChunkMetadataMap getChunkMetadataMapPhysicalCopy() const
HOST DEVICE int get_size() const
Definition: sqltypes.h:393
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
bool is_varlen_array() const
Definition: sqltypes.h:589
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:224
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:225
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:41
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:249
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:545
void cancelUpdate()
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
bool is_fp() const
Definition: sqltypes.h:584
HOST DEVICE int get_scale() const
Definition: sqltypes.h:386
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:108
#define UNREACHABLE()
Definition: Logger.h:337
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:1235
std::vector< bool > is_default
Definition: Fragmenter.h:75
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:114
bool g_enable_auto_metadata_update
void updateFragmenterAndCleanupChunks()
heavyai::unique_lock< heavyai::shared_mutex > write_lock
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
bool is_time() const
Definition: sqltypes.h:586
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:1258
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
bool g_enable_string_functions
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:70
std::shared_lock< T > shared_lock
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:219
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:72
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:65
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
bool commitUpdate()
void stageUpdate()
CONSTEXPR DEVICE bool is_null(const T &value)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:53
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:248
bool is_integer() const
Definition: sqltypes.h:582
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
bool is_timeinterval() const
Definition: sqltypes.h:591
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
std::unique_lock< T > unique_lock
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getDatabaseId() const
Definition: Catalog.h:304
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4839
void initializeUnsetMetadata(const TableDescriptor *td, Fragmenter_Namespace::FragmentInfo &fragment_info)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1999
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
bool is_boolean() const
Definition: sqltypes.h:587
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:4853
#define CHECK_LT(x, y)
Definition: Logger.h:303
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:492
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:389
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:73
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:1259
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
Data_Namespace::MemoryLevel persistenceLevel
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:383
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:179
int logicalTableId
Definition: UpdelRoll.h:54
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:392
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
size_t getNumTuple(const MetaDataKey &key) const
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3658
#define CHECK(condition)
Definition: Logger.h:291
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
void setTableEpochsLogExceptions(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3694
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:580
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
void free(AbstractBuffer *buffer)
Definition: DataMgr.cpp:525
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:388
int8_t * numbersPtr
Definition: sqltypes.h:223
unencoded array encoder
void set_minmax(T &min, T &max, T const val)
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
int cpu_threads()
Definition: thread_count.h:25
bool is_decimal() const
Definition: sqltypes.h:583
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:71
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3630
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)