OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <mutex>
18 #include <string>
19 #include <vector>
20 
21 #include <boost/variant.hpp>
22 #include <boost/variant/get.hpp>
23 
24 #include "Catalog/Catalog.h"
28 #include "LockMgr/LockMgr.h"
29 #include "QueryEngine/Execute.h"
30 #include "Shared/DateConverters.h"
32 #include "Shared/thread_count.h"
34 
36 
38 
39 namespace Fragmenter_Namespace {
40 
41 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
42  for (auto& t : threads) {
43  t.get();
44  }
45  threads.clear();
46 }
47 
48 inline bool is_integral(const SQLTypeInfo& t) {
49  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
50 }
51 
53 
55  const TableDescriptor* td,
56  const ColumnDescriptor* cd,
57  const int fragment_id,
58  const std::vector<uint64_t>& frag_offsets,
59  const ScalarTargetValue& rhs_value,
60  const SQLTypeInfo& rhs_type,
61  const Data_Namespace::MemoryLevel memory_level,
62  UpdelRoll& updel_roll) {
63  updateColumn(catalog,
64  td,
65  cd,
66  fragment_id,
67  frag_offsets,
68  std::vector<ScalarTargetValue>(1, rhs_value),
69  rhs_type,
70  memory_level,
71  updel_roll);
72 }
73 
74 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
75  const TableDescriptor* td,
76  const FragmentInfo& fragment,
77  const Data_Namespace::MemoryLevel memory_level,
78  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
79  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
80  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
81  ++nc;
82  if (!cd->isVirtualCol) {
83  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
84  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
85  ChunkKey chunk_key{
86  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
87  auto chunk = Chunk_NS::Chunk::getChunk(cd,
88  &catalog->getDataMgr(),
89  chunk_key,
90  memory_level,
91  0,
92  chunk_meta_it->second->numBytes,
93  chunk_meta_it->second->numElements);
94  chunks.push_back(chunk);
95  }
96  }
97  }
98  return chunks.size();
99 }
100 
102  public:
104 
105  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
106 
107  virtual void addDataBlocksToInsertData(
108  Fragmenter_Namespace::InsertData& insertData) = 0;
109 };
110 
111 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
113  using ColumnDataPtr =
114  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
115 
119  const BUFFER_DATA_TYPE* data_buffer_addr_;
120 
121  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
122  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
123  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
124  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
125  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
126  }
127 
128  ~ScalarChunkConverter() override {}
129 
130  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
131  auto buffer_value = data_buffer_addr_[indexInFragment];
132  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
133  column_data_.get()[row] = insert_value;
134  }
135 
137  DataBlockPtr dataBlock;
138  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
139  insertData.data.push_back(dataBlock);
140  insertData.columnIds.push_back(column_descriptor_->columnId);
141  }
142 };
143 
147 
148  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
151 
152  FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
153  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
154  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
157  }
158 
160 
161  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
162  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
163 
165  src_value_ptr);
166 
167  (*column_data_)[row] = ArrayDatum(
168  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
169  }
170 
172  DataBlockPtr dataBlock;
173  dataBlock.arraysPtr = column_data_.get();
174  insertData.data.push_back(dataBlock);
175  insertData.columnIds.push_back(column_descriptor_->columnId);
176  }
177 };
178 
181 
182  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
183  : FixedLenArrayChunkConverter(num_rows, chunk) {
185  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
186  : nullptr);
187  }
188 
189  ~ArrayChunkConverter() override {}
190 
191  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
192  auto startIndex = index_buffer_addr_[indexInFragment];
193  auto endIndex = index_buffer_addr_[indexInFragment + 1];
194  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
195  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
196  (*column_data_)[row] = ArrayDatum(
197  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
198  }
199 };
200 
204 
205  std::unique_ptr<std::vector<std::string>> column_data_;
206  const int8_t* data_buffer_addr_;
208 
209  StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk* chunk)
210  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
211  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
214  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
215  : nullptr);
216  }
217 
218  ~StringChunkConverter() override {}
219 
220  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
221  size_t src_value_size =
222  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
223  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
224  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
225  }
226 
228  DataBlockPtr dataBlock;
229  dataBlock.stringsPtr = column_data_.get();
230  insertData.data.push_back(dataBlock);
231  insertData.columnIds.push_back(column_descriptor_->columnId);
232  }
233 };
234 
235 template <typename BUFFER_DATA_TYPE>
237  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
238 
242  const BUFFER_DATA_TYPE* data_buffer_addr_;
243 
244  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
245  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
247  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
248  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
249  }
250 
251  ~DateChunkConverter() override {}
252 
253  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
254  auto buffer_value = data_buffer_addr_[indexInFragment];
255  auto insert_value = static_cast<int64_t>(buffer_value);
257  }
258 
260  DataBlockPtr dataBlock;
261  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
262  insertData.data.push_back(dataBlock);
263  insertData.columnIds.push_back(column_descriptor_->columnId);
264  }
265 };
266 
268  const Catalog_Namespace::Catalog* catalog,
269  const TableDescriptor* td,
270  const int fragmentId,
271  const std::vector<TargetMetaInfo> sourceMetaInfo,
272  const std::vector<const ColumnDescriptor*> columnDescriptors,
273  const RowDataProvider& sourceDataProvider,
274  const size_t indexOffFragmentOffsetColumn,
275  const Data_Namespace::MemoryLevel memoryLevel,
276  UpdelRoll& updelRoll,
277  Executor* executor) {
278  updelRoll.is_varlen_update = true;
279  updelRoll.catalog = catalog;
280  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
281  updelRoll.memoryLevel = memoryLevel;
282 
283  size_t num_entries = sourceDataProvider.getEntryCount();
284  size_t num_rows = sourceDataProvider.getRowCount();
285 
286  if (0 == num_rows) {
287  // bail out early
288  return;
289  }
290 
292 
293  auto fragment_ptr = getFragmentInfo(fragmentId);
294  auto& fragment = *fragment_ptr;
295  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
296  get_chunks(catalog, td, fragment, memoryLevel, chunks);
297  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
298  columnDescriptors.size());
299  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
300  size_t indexOfDeletedColumn{0};
301  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
302  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
303  auto chunk = chunks[indexOfChunk];
304  const auto chunk_cd = chunk->getColumnDesc();
305 
306  if (chunk_cd->isDeletedCol) {
307  indexOfDeletedColumn = chunk_cd->columnId;
308  deletedChunk = chunk;
309  continue;
310  }
311 
312  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
313  columnDescriptors.end(),
314  [=](const ColumnDescriptor* cd) -> bool {
315  return cd->columnId == chunk_cd->columnId;
316  });
317 
318  if (targetColumnIt != columnDescriptors.end()) {
319  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
320 
321  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
322  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
323 
325  num_rows,
326  *catalog,
327  sourceDataMetaInfo,
328  targetDescriptor,
329  targetDescriptor->columnType,
330  !targetDescriptor->columnType.get_notnull(),
331  sourceDataProvider.getLiteralDictionary(),
333  ? executor->getStringDictionaryProxy(
334  sourceDataMetaInfo.get_type_info().get_comp_param(),
335  executor->getRowSetMemoryOwner(),
336  true)
337  : nullptr};
338  auto converter = factory.create(param);
339  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
340 
341  if (targetDescriptor->columnType.is_geometry()) {
342  // geometry columns are composites
343  // need to skip chunks, depending on geo type
344  switch (targetDescriptor->columnType.get_type()) {
345  case kMULTIPOLYGON:
346  indexOfChunk += 5;
347  break;
348  case kPOLYGON:
349  indexOfChunk += 4;
350  break;
351  case kLINESTRING:
352  indexOfChunk += 2;
353  break;
354  case kPOINT:
355  indexOfChunk += 1;
356  break;
357  default:
358  CHECK(false); // not supported
359  }
360  }
361  } else {
362  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
363  std::unique_ptr<ChunkToInsertDataConverter> converter;
364 
365  if (chunk_cd->columnType.is_fixlen_array()) {
366  converter =
367  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
368  } else if (chunk_cd->columnType.is_string()) {
369  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
370  } else if (chunk_cd->columnType.is_geometry()) {
371  // the logical geo column is a string column
372  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
373  } else {
374  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
375  }
376 
377  chunkConverters.push_back(std::move(converter));
378 
379  } else if (chunk_cd->columnType.is_date_in_days()) {
380  /* Q: Why do we need this?
381  A: In variable length updates path we move the chunk content of column
382  without decoding. Since it again passes through DateDaysEncoder
383  the expected value should be in seconds, but here it will be in days.
384  Therefore, using DateChunkConverter chunk values are being scaled to
385  seconds which then ultimately encoded in days in DateDaysEncoder.
386  */
387  std::unique_ptr<ChunkToInsertDataConverter> converter;
388  const size_t physical_size = chunk_cd->columnType.get_size();
389  if (physical_size == 2) {
390  converter =
391  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
392  } else if (physical_size == 4) {
393  converter =
394  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
395  } else {
396  CHECK(false);
397  }
398  chunkConverters.push_back(std::move(converter));
399  } else {
400  std::unique_ptr<ChunkToInsertDataConverter> converter;
401  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
402  int logical_size = logical_type.get_size();
403  int physical_size = chunk_cd->columnType.get_size();
404 
405  if (logical_type.is_string()) {
406  // for dicts -> logical = physical
407  logical_size = physical_size;
408  }
409 
410  if (8 == physical_size) {
411  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
412  num_rows, chunk.get());
413  } else if (4 == physical_size) {
414  if (8 == logical_size) {
415  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
416  num_rows, chunk.get());
417  } else {
418  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
419  num_rows, chunk.get());
420  }
421  } else if (2 == chunk_cd->columnType.get_size()) {
422  if (8 == logical_size) {
423  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
424  num_rows, chunk.get());
425  } else if (4 == logical_size) {
426  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
427  num_rows, chunk.get());
428  } else {
429  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
430  num_rows, chunk.get());
431  }
432  } else if (1 == chunk_cd->columnType.get_size()) {
433  if (8 == logical_size) {
434  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
435  num_rows, chunk.get());
436  } else if (4 == logical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
438  num_rows, chunk.get());
439  } else if (2 == logical_size) {
440  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
441  num_rows, chunk.get());
442  } else {
443  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
444  num_rows, chunk.get());
445  }
446  } else {
447  CHECK(false); // unknown
448  }
449 
450  chunkConverters.push_back(std::move(converter));
451  }
452  }
453  }
454 
455  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
456  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
457 
458  updelRoll.addDirtyChunk(deletedChunk, fragment.fragmentId);
459  bool* deletedChunkBuffer =
460  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
461 
462  std::atomic<size_t> row_idx{0};
463 
464  auto row_converter = [&sourceDataProvider,
465  &sourceDataConverters,
466  &indexOffFragmentOffsetColumn,
467  &chunkConverters,
468  &deletedChunkBuffer,
469  &row_idx](size_t indexOfEntry) -> void {
470  // convert the source data
471  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
472  if (row.empty()) {
473  return;
474  }
475 
476  size_t indexOfRow = row_idx.fetch_add(1);
477 
478  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
479  if (sourceDataConverters[col]) {
480  const auto& mapd_variant = row[col];
481  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
482  }
483  }
484 
485  auto scalar = checked_get(
486  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
487  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
488 
489  // convert the remaining chunks
490  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
491  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
492  }
493 
494  // now mark the row as deleted
495  deletedChunkBuffer[indexInChunkBuffer] = true;
496  };
497 
498  bool can_go_parallel = num_rows > 20000;
499 
500  if (can_go_parallel) {
501  const size_t num_worker_threads = cpu_threads();
502  std::vector<std::future<void>> worker_threads;
503  for (size_t i = 0,
504  start_entry = 0,
505  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
506  i < num_worker_threads && start_entry < num_entries;
507  ++i, start_entry += stride) {
508  const auto end_entry = std::min(start_entry + stride, num_rows);
509  worker_threads.push_back(std::async(
511  [&row_converter](const size_t start, const size_t end) {
512  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
513  row_converter(indexOfRow);
514  }
515  },
516  start_entry,
517  end_entry));
518  }
519 
520  for (auto& child : worker_threads) {
521  child.wait();
522  }
523 
524  } else {
525  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
526  row_converter(entryIdx);
527  }
528  }
529 
531  insert_data.databaseId = catalog->getCurrentDB().dbId;
532  insert_data.tableId = td->tableId;
533 
534  for (size_t i = 0; i < chunkConverters.size(); i++) {
535  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
536  continue;
537  }
538 
539  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
540  if (sourceDataConverters[i]) {
541  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
542  }
543  continue;
544  }
545 
546  insert_data.numRows = num_rows;
547  insert_data.is_default.resize(insert_data.columnIds.size(), false);
548  insertDataNoCheckpoint(insert_data);
549 
550  // update metdata for deleted chunk as we are doing special handling
551  auto chunkMetadata =
552  updelRoll.getChunkMetadata({td, &fragment}, indexOfDeletedColumn, fragment);
553  chunkMetadata->chunkStats.max.boolval = 1;
554 
555  // Im not completely sure that we need to do this in fragmented and on the buffer
556  // but leaving this alone for now
557  if (!deletedChunk->getBuffer()->hasEncoder()) {
558  deletedChunk->initEncoder();
559  }
560  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
561 
562  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
563  // An append to the same fragment will increase shadowNumTuples.
564  // Update NumElems in this case. Otherwise, use existing NumElems.
565  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
566  }
567  deletedChunk->getBuffer()->setUpdated();
568 }
569 
570 namespace {
571 inline void update_metadata(SQLTypeInfo const& ti,
573  int64_t const updated_val,
574  int64_t const old_val,
575  NullSentinelSupplier s = NullSentinelSupplier()) {
576  if (ti.get_notnull()) {
578  update_stats.new_values_stats.max_int64t,
579  updated_val);
581  update_stats.old_values_stats.max_int64t,
582  old_val);
583  } else {
585  update_stats.new_values_stats.max_int64t,
586  update_stats.new_values_stats.has_null,
587  updated_val,
588  s(ti, updated_val));
590  update_stats.old_values_stats.max_int64t,
591  update_stats.old_values_stats.has_null,
592  old_val,
593  s(ti, old_val));
594  }
595 }
596 
597 inline void update_metadata(SQLTypeInfo const& ti,
598  ChunkUpdateStats& update_stats,
599  double const updated_val,
600  double const old_val,
601  NullSentinelSupplier s = NullSentinelSupplier()) {
602  if (ti.get_notnull()) {
604  update_stats.new_values_stats.max_double,
605  updated_val);
607  update_stats.old_values_stats.max_double,
608  old_val);
609  } else {
611  update_stats.new_values_stats.max_double,
612  update_stats.new_values_stats.has_null,
613  updated_val,
614  s(ti, updated_val));
616  update_stats.old_values_stats.max_double,
617  update_stats.old_values_stats.has_null,
618  old_val,
619  s(ti, old_val));
620  }
621 }
622 
623 inline void update_metadata(UpdateValuesStats& agg_stats,
624  const UpdateValuesStats& new_stats) {
625  agg_stats.has_null = agg_stats.has_null || new_stats.has_null;
626  agg_stats.max_double = std::max<double>(agg_stats.max_double, new_stats.max_double);
627  agg_stats.min_double = std::min<double>(agg_stats.min_double, new_stats.min_double);
628  agg_stats.max_int64t = std::max<int64_t>(agg_stats.max_int64t, new_stats.max_int64t);
629  agg_stats.min_int64t = std::min<int64_t>(agg_stats.min_int64t, new_stats.min_int64t);
630 }
631 } // namespace
632 
633 std::optional<ChunkUpdateStats> InsertOrderFragmenter::updateColumn(
634  const Catalog_Namespace::Catalog* catalog,
635  const TableDescriptor* td,
636  const ColumnDescriptor* cd,
637  const int fragment_id,
638  const std::vector<uint64_t>& frag_offsets,
639  const std::vector<ScalarTargetValue>& rhs_values,
640  const SQLTypeInfo& rhs_type,
641  const Data_Namespace::MemoryLevel memory_level,
642  UpdelRoll& updel_roll) {
643  updel_roll.catalog = catalog;
644  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
645  updel_roll.memoryLevel = memory_level;
646 
647  const size_t ncore = cpu_threads();
648  const auto nrow = frag_offsets.size();
649  const auto n_rhs_values = rhs_values.size();
650  if (0 == nrow) {
651  return {};
652  }
653  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
654 
655  auto fragment_ptr = getFragmentInfo(fragment_id);
656  auto& fragment = *fragment_ptr;
657  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
658  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
659  ChunkKey chunk_key{
660  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
661  auto chunk = Chunk_NS::Chunk::getChunk(cd,
662  &catalog->getDataMgr(),
663  chunk_key,
665  0,
666  chunk_meta_it->second->numBytes,
667  chunk_meta_it->second->numElements);
668 
669  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
670 
671  // parallel update elements
672  std::vector<std::future<void>> threads;
673 
674  const auto segsz = (nrow + ncore - 1) / ncore;
675  auto dbuf = chunk->getBuffer();
676  auto dbuf_addr = dbuf->getMemoryPtr();
677  dbuf->setUpdated();
678  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
679  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
680  threads.emplace_back(std::async(
681  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
682  SQLTypeInfo lhs_type = cd->columnType;
683 
684  // !! not sure if this is a undocumented convention or a bug, but for a sharded
685  // table the dictionary id of a encoded string column is not specified by
686  // comp_param in physical table but somehow in logical table :) comp_param in
687  // physical table is always 0, so need to adapt accordingly...
688  auto cdl = (shard_ < 0)
689  ? cd
690  : catalog->getMetadataForColumn(
691  catalog->getLogicalTableId(td->tableId), cd->columnId);
692  CHECK(cdl);
693  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
694  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
695  lhs_type, &decimalOverflowValidator);
696  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
697  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
698  lhs_type, &dateDaysOverflowValidator);
699 
700  StringDictionary* stringDict{nullptr};
701  if (lhs_type.is_string()) {
702  CHECK(kENCODING_DICT == lhs_type.get_compression());
703  auto dictDesc = const_cast<DictDescriptor*>(
704  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
705  CHECK(dictDesc);
706  stringDict = dictDesc->stringDict.get();
707  CHECK(stringDict);
708  }
709 
710  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
711  const auto roffs = frag_offsets[r];
712  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
713  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
714  ScalarTargetValue sv2;
715 
716  // Subtle here is on the two cases of string-to-string assignments, when
717  // upstream passes RHS string as a string index instead of a preferred "real
718  // string".
719  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
720  // index
721  // in this layer, so if upstream passes a str idx here, an
722  // exception is thrown.
723  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
724  // str idx.
725  if (rhs_type.is_string()) {
726  if (const auto vp = boost::get<int64_t>(sv)) {
727  auto dictDesc = const_cast<DictDescriptor*>(
728  catalog->getMetadataForDict(rhs_type.get_comp_param()));
729  if (nullptr == dictDesc) {
730  throw std::runtime_error(
731  "UPDATE does not support cast from string literal to string "
732  "column.");
733  }
734  auto stringDict = dictDesc->stringDict.get();
735  CHECK(stringDict);
736  sv2 = NullableString(stringDict->getString(*vp));
737  sv = &sv2;
738  }
739  }
740 
741  if (const auto vp = boost::get<int64_t>(sv)) {
742  auto v = *vp;
743  if (lhs_type.is_string()) {
744  throw std::runtime_error("UPDATE does not support cast to string.");
745  }
746  int64_t old_val;
747  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
748  // Handle special case where date column with date in days encoding stores
749  // metadata in epoch seconds.
750  if (lhs_type.is_date_in_days()) {
752  }
753  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
754  if (lhs_type.is_decimal()) {
755  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
756  int64_t decimal_val;
757  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
758  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
759  lhs_type.get_notnull() == false)
760  ? v
761  : decimal_val;
763  lhs_type, update_stats_per_thread[c], target_value, old_val);
764  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
765  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
766  if (positive_v_and_negative_d || negative_v_and_positive_d) {
767  throw std::runtime_error(
768  "Data conversion overflow on " + std::to_string(v) +
769  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
770  std::to_string(rhs_type.get_scale()) + ") to (" +
771  std::to_string(lhs_type.get_dimension()) + ", " +
772  std::to_string(lhs_type.get_scale()) + ")");
773  }
774  } else if (is_integral(lhs_type)) {
775  if (lhs_type.is_date_in_days()) {
776  // Store meta values in seconds
777  if (lhs_type.get_size() == 2) {
778  nullAwareDateOverflowValidator.validate<int16_t>(v);
779  } else {
780  nullAwareDateOverflowValidator.validate<int32_t>(v);
781  }
782  int64_t days;
783  get_scalar<int64_t>(data_ptr, lhs_type, days);
784  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
785  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
786  lhs_type.get_notnull() == false)
787  ? NullSentinelSupplier()(lhs_type, v)
788  : seconds;
790  lhs_type, update_stats_per_thread[c], target_value, old_val);
791  } else {
792  int64_t target_value;
793  if (rhs_type.is_decimal()) {
794  target_value = round(decimal_to_double(rhs_type, v));
795  } else {
796  target_value = v;
797  }
799  lhs_type, update_stats_per_thread[c], target_value, old_val);
800  }
801  } else {
802  if (rhs_type.is_decimal()) {
803  update_metadata(lhs_type,
804  update_stats_per_thread[c],
805  decimal_to_double(rhs_type, v),
806  double(old_val));
807  } else {
808  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
809  }
810  }
811  } else if (const auto vp = boost::get<double>(sv)) {
812  auto v = *vp;
813  if (lhs_type.is_string()) {
814  throw std::runtime_error("UPDATE does not support cast to string.");
815  }
816  double old_val;
817  get_scalar<double>(data_ptr, lhs_type, old_val);
818  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
819  if (lhs_type.is_integer()) {
821  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
822  } else if (lhs_type.is_fp()) {
824  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
825  } else {
826  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
827  "LHS with a floating RHS.";
828  }
829  } else if (const auto vp = boost::get<float>(sv)) {
830  auto v = *vp;
831  if (lhs_type.is_string()) {
832  throw std::runtime_error("UPDATE does not support cast to string.");
833  }
834  float old_val;
835  get_scalar<float>(data_ptr, lhs_type, old_val);
836  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
837  if (lhs_type.is_integer()) {
839  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
840  } else {
841  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
842  }
843  } else if (const auto vp = boost::get<NullableString>(sv)) {
844  const auto s = boost::get<std::string>(vp);
845  const auto sval = s ? *s : std::string("");
846  if (lhs_type.is_string()) {
847  decltype(stringDict->getOrAdd(sval)) sidx;
848  {
849  std::unique_lock<std::mutex> lock(temp_mutex_);
850  sidx = stringDict->getOrAdd(sval);
851  }
852  int64_t old_val;
853  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
854  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
856  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
857  } else if (sval.size() > 0) {
858  auto dval = std::atof(sval.data());
859  if (lhs_type.is_boolean()) {
860  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
861  } else if (lhs_type.is_time()) {
862  throw std::runtime_error(
863  "Date/Time/Timestamp update not supported through translated "
864  "string path.");
865  }
866  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
867  double old_val;
868  get_scalar<double>(data_ptr, lhs_type, old_val);
869  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
871  lhs_type, update_stats_per_thread[c], double(dval), old_val);
872  } else {
873  int64_t old_val;
874  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
875  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
877  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
878  }
879  } else {
880  put_null(data_ptr, lhs_type, cd->columnName);
881  update_stats_per_thread[c].new_values_stats.has_null = true;
882  }
883  } else {
884  CHECK(false);
885  }
886  }
887  }));
888  if (threads.size() >= (size_t)cpu_threads()) {
889  wait_cleanup_threads(threads);
890  }
891  }
892  wait_cleanup_threads(threads);
893 
894  // for unit test
896  if (cd->isDeletedCol) {
897  const auto deleted_offsets = getVacuumOffsets(chunk);
898  if (deleted_offsets.size() > 0) {
899  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
900  return {};
901  }
902  }
903  }
905  for (size_t c = 0; c < ncore; ++c) {
906  update_metadata(update_stats.new_values_stats,
907  update_stats_per_thread[c].new_values_stats);
908  update_metadata(update_stats.old_values_stats,
909  update_stats_per_thread[c].old_values_stats);
910  }
911 
912  CHECK_GT(fragment.shadowNumTuples, size_t(0));
914  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
915  update_stats.updated_rows_count = nrow;
916  update_stats.fragment_rows_count = fragment.shadowNumTuples;
917  update_stats.chunk = chunk;
918  return update_stats;
919 }
920 
922  const ColumnDescriptor* cd,
923  FragmentInfo& fragment,
924  std::shared_ptr<Chunk_NS::Chunk> chunk,
925  const UpdateValuesStats& new_values_stats,
926  const SQLTypeInfo& rhs_type,
927  UpdelRoll& updel_roll) {
928  mapd_unique_lock<mapd_shared_mutex> write_lock(fragmentInfoMutex_);
929  auto buffer = chunk->getBuffer();
930  const auto& lhs_type = cd->columnType;
931 
932  auto encoder = buffer->getEncoder();
933  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
934  static_assert(std::is_same<decltype(min), decltype(max)>::value,
935  "Type mismatch on min/max");
936  if (has_null) {
937  encoder->updateStats(decltype(min)(), true);
938  }
939  if (max < min) {
940  return;
941  }
942  encoder->updateStats(min, false);
943  encoder->updateStats(max, false);
944  };
945 
946  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
947  update_stats(new_values_stats.min_int64t,
948  new_values_stats.max_int64t,
949  new_values_stats.has_null);
950  } else if (lhs_type.is_fp()) {
951  update_stats(new_values_stats.min_double,
952  new_values_stats.max_double,
953  new_values_stats.has_null);
954  } else if (lhs_type.is_decimal()) {
955  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
956  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
957  new_values_stats.has_null);
958  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
959  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
960  update_stats(new_values_stats.min_int64t,
961  new_values_stats.max_int64t,
962  new_values_stats.has_null);
963  }
964  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
965  auto chunk_metadata =
966  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
967  buffer->getEncoder()->getMetadata(chunk_metadata);
968 }
969 
971  const MetaDataKey& key,
972  UpdelRoll& updel_roll) {
973  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
974  const auto chunk_metadata_map = updel_roll.getChunkMetadataMap(key);
975  auto& fragmentInfo = *key.second;
976  fragmentInfo.setChunkMetadataMap(chunk_metadata_map);
977  fragmentInfo.shadowChunkMetadataMap = fragmentInfo.getChunkMetadataMapPhysicalCopy();
978  fragmentInfo.shadowNumTuples = updel_roll.getNumTuple(key);
979  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
980 }
981 
983  const TableDescriptor* td,
984  const FragmentInfo& fragment,
985  const Data_Namespace::MemoryLevel memory_level) {
986  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
987  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
988  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
989  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
990  ++ncol;
991  if (!cd->isVirtualCol) {
992  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
993  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
994  ChunkKey chunk_key{
995  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
996  auto chunk = Chunk_NS::Chunk::getChunk(cd,
997  &catalog_->getDataMgr(),
998  chunk_key,
999  memory_level,
1000  0,
1001  chunk_meta_it->second->numBytes,
1002  chunk_meta_it->second->numElements);
1003  chunks.push_back(chunk);
1004  }
1005  }
1006  }
1007  return chunks;
1008 }
1009 
1010 // get a sorted vector of offsets of rows to vacuum
1011 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1012  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1013  const auto data_buffer = chunk->getBuffer();
1014  const auto data_addr = data_buffer->getMemoryPtr();
1015  const size_t nrows_in_chunk = data_buffer->size();
1016  const size_t ncore = cpu_threads();
1017  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1018  std::vector<std::vector<uint64_t>> deleted_offsets;
1019  deleted_offsets.resize(ncore);
1020  std::vector<std::future<void>> threads;
1021  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1022  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1023  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1024  const auto ithread = rbegin / segsz;
1025  CHECK(ithread < deleted_offsets.size());
1026  deleted_offsets[ithread].reserve(segsz);
1027  for (size_t r = rbegin; r < rend; ++r) {
1028  if (data_addr[r]) {
1029  deleted_offsets[ithread].push_back(r);
1030  }
1031  }
1032  }));
1033  }
1034  wait_cleanup_threads(threads);
1035  std::vector<uint64_t> all_deleted_offsets;
1036  for (size_t i = 0; i < ncore; ++i) {
1037  all_deleted_offsets.insert(
1038  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1039  }
1040  return all_deleted_offsets;
1041 }
1042 
1043 template <typename T>
1044 static void set_chunk_stats(const SQLTypeInfo& col_type,
1045  int8_t* data_addr,
1046  bool& has_null,
1047  T& min,
1048  T& max) {
1049  T v;
1050  const auto can_be_null = !col_type.get_notnull();
1051  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1052  if (is_null) {
1053  has_null = has_null || (can_be_null && is_null);
1054  } else {
1055  set_minmax(min, max, v);
1056  }
1057 }
1058 
1060  FragmentInfo& fragment,
1061  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1062  const size_t nrows_to_keep,
1063  UpdelRoll& updel_roll) {
1064  auto cd = chunk->getColumnDesc();
1065  auto td = catalog->getMetadataForTable(cd->tableId);
1066  auto data_buffer = chunk->getBuffer();
1067  auto chunkMetadata =
1068  updel_roll.getChunkMetadata({td, &fragment}, cd->columnId, fragment);
1069  chunkMetadata->numElements = nrows_to_keep;
1070  chunkMetadata->numBytes = data_buffer->size();
1071  updel_roll.addDirtyChunk(chunk, fragment.fragmentId);
1072 }
1073 
1075  const FragmentInfo& fragment,
1076  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1077  const std::vector<uint64_t>& frag_offsets) {
1078  const auto cd = chunk->getColumnDesc();
1079  const auto& col_type = cd->columnType;
1080  auto data_buffer = chunk->getBuffer();
1081  auto data_addr = data_buffer->getMemoryPtr();
1082  auto element_size =
1083  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1084  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1085  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1086  size_t nbytes_fix_data_to_keep = 0;
1087  auto nrows_to_vacuum = frag_offsets.size();
1088  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1089  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1090  auto is_last_one = irow == nrows_to_vacuum;
1091  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1092  auto maddr_to_vacuum = data_addr;
1093  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1094  if (nrows_to_keep > 0) {
1095  auto nbytes_to_keep = nrows_to_keep * element_size;
1096  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1097  // move curr fixlen row block toward front
1098  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1099  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1100  nbytes_to_keep);
1101  }
1102  irow_of_blk_to_fill += nrows_to_keep;
1103  nbytes_fix_data_to_keep += nbytes_to_keep;
1104  }
1105  irow_of_blk_to_keep = irow_to_vacuum + 1;
1106  }
1107  return nbytes_fix_data_to_keep;
1108 }
1109 
1110 // Gets the initial padding required for the chunk buffer. For variable length array
1111 // columns, if the first element after vacuuming is going to be a null array, a padding
1112 // with a value that is greater than 0 is expected.
1113 size_t get_null_padding(bool is_varlen_array,
1114  const std::vector<uint64_t>& frag_offsets,
1115  const StringOffsetT* index_array,
1116  size_t fragment_row_count) {
1117  if (is_varlen_array) {
1118  size_t first_non_deleted_row_index{0};
1119  for (auto deleted_offset : frag_offsets) {
1120  if (first_non_deleted_row_index < deleted_offset) {
1121  break;
1122  } else {
1123  first_non_deleted_row_index++;
1124  }
1125  }
1126  CHECK_LT(first_non_deleted_row_index, fragment_row_count);
1127  if (first_non_deleted_row_index == 0) {
1128  // If the first row in the fragment is not deleted, then the first offset in the
1129  // index buffer/array already contains expected padding.
1130  return index_array[0];
1131  } else {
1132  // If the first non-deleted element is a null array (indentified by a negative
1133  // offset), get a padding value for the chunk buffer.
1134  if (index_array[first_non_deleted_row_index + 1] < 0) {
1135  size_t first_non_zero_offset{0};
1136  for (size_t i = 0; i <= first_non_deleted_row_index; i++) {
1137  if (index_array[i] != 0) {
1138  first_non_zero_offset = index_array[i];
1139  break;
1140  }
1141  }
1142  CHECK_GT(first_non_zero_offset, static_cast<size_t>(0));
1144  first_non_zero_offset);
1145  } else {
1146  return 0;
1147  }
1148  }
1149  } else {
1150  return 0;
1151  }
1152 }
1153 
1154 // Gets the indexes of variable length null arrays in the chunk after vacuuming.
1155 std::set<size_t> get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info,
1156  const std::vector<uint64_t>& frag_offsets,
1157  const StringOffsetT* index_array,
1158  size_t fragment_row_count) {
1159  std::set<size_t> null_array_indexes;
1160  if (sql_type_info.is_varlen_array() && !sql_type_info.get_notnull()) {
1161  size_t frag_offset_index{0};
1162  size_t vacuum_offset{0};
1163  for (size_t i = 0; i < fragment_row_count; i++) {
1164  if (frag_offset_index < frag_offsets.size() &&
1165  i == frag_offsets[frag_offset_index]) {
1166  frag_offset_index++;
1167  vacuum_offset++;
1168  } else if (index_array[i + 1] < 0) {
1169  null_array_indexes.emplace(i - vacuum_offset);
1170  }
1171  }
1172  }
1173  return null_array_indexes;
1174 }
1175 
1176 StringOffsetT get_buffer_offset(bool is_varlen_array,
1177  const StringOffsetT* index_array,
1178  size_t index) {
1179  auto offset = index_array[index];
1180  if (offset < 0) {
1181  // Variable length arrays encode null arrays as negative offsets
1182  CHECK(is_varlen_array);
1183  offset = -offset;
1184  }
1185  return offset;
1186 }
1187 
1189  const FragmentInfo& fragment,
1190  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1191  const std::vector<uint64_t>& frag_offsets) {
1192  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1193  auto data_buffer = chunk->getBuffer();
1194  CHECK(data_buffer);
1195  auto index_buffer = chunk->getIndexBuf();
1196  CHECK(index_buffer);
1197  auto data_addr = data_buffer->getMemoryPtr();
1198  auto indices_addr = index_buffer->getMemoryPtr();
1199  CHECK(indices_addr);
1200  auto index_array = (StringOffsetT*)indices_addr;
1201  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1202  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1203  size_t nbytes_fix_data_to_keep = 0;
1204  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1205  size_t null_padding =
1206  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1207  size_t nbytes_var_data_to_keep = null_padding;
1208  auto null_array_indexes = get_var_len_null_array_indexes(
1209  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1210  auto nrows_to_vacuum = frag_offsets.size();
1211  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1212  auto is_last_one = irow == nrows_to_vacuum;
1213  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1214  auto maddr_to_vacuum = data_addr;
1215  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1216  if (nrows_to_keep > 0) {
1217  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1218  auto deleted_row_start_offset =
1219  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1220  auto kept_row_start_offset =
1221  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1222  auto nbytes_to_keep =
1223  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1224  kept_row_start_offset;
1225  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1226  if (nbytes_to_keep > 0) {
1227  CHECK(data_addr);
1228  // move curr varlen row block toward front
1229  memmove(data_addr + ibyte_var_data_to_keep,
1230  data_addr + kept_row_start_offset,
1231  nbytes_to_keep);
1232  }
1233 
1234  const auto base_offset = kept_row_start_offset;
1235  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1236  auto update_index = irow_of_blk_to_keep + i;
1237  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1238  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1239  }
1240  }
1241  nbytes_var_data_to_keep += nbytes_to_keep;
1242  maddr_to_vacuum = indices_addr;
1243 
1244  constexpr static auto index_element_size = sizeof(StringOffsetT);
1245  nbytes_to_keep = nrows_to_keep * index_element_size;
1246  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1247  // move curr fixlen row block toward front
1248  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1249  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1250  nbytes_to_keep);
1251  }
1252  irow_of_blk_to_fill += nrows_to_keep;
1253  nbytes_fix_data_to_keep += nbytes_to_keep;
1254  }
1255  irow_of_blk_to_keep = irow_to_vacuum + 1;
1256  }
1257 
1258  // Set expected null padding, last offset, and negative values for null array offsets.
1259  index_array[0] = null_padding;
1260  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1261  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1262  if (!is_varlen_array) {
1263  CHECK(null_array_indexes.empty());
1264  }
1265  for (auto index : null_array_indexes) {
1266  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1267  }
1268  return nbytes_var_data_to_keep;
1269 }
1270 
1272  const TableDescriptor* td,
1273  const int fragment_id,
1274  const std::vector<uint64_t>& frag_offsets,
1275  const Data_Namespace::MemoryLevel memory_level,
1276  UpdelRoll& updel_roll) {
1277  auto fragment_ptr = getFragmentInfo(fragment_id);
1278  auto& fragment = *fragment_ptr;
1279  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1280  const auto ncol = chunks.size();
1281 
1282  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1283 
1284  // parallel delete columns
1285  std::vector<std::future<void>> threads;
1286  auto nrows_to_vacuum = frag_offsets.size();
1287  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1288  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1289 
1290  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1291  auto chunk = chunks[ci];
1292  const auto cd = chunk->getColumnDesc();
1293  const auto& col_type = cd->columnType;
1294  auto data_buffer = chunk->getBuffer();
1295  auto index_buffer = chunk->getIndexBuf();
1296  auto data_addr = data_buffer->getMemoryPtr();
1297  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1298  auto index_array = (StringOffsetT*)indices_addr;
1299  bool is_varlen = col_type.is_varlen_indeed();
1300 
1301  auto fixlen_vacuum =
1302  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1303  size_t nbytes_fix_data_to_keep;
1304  if (nrows_to_keep == 0) {
1305  nbytes_fix_data_to_keep = 0;
1306  } else {
1307  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1308  }
1309 
1310  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1311  data_buffer->setSize(nbytes_fix_data_to_keep);
1312  data_buffer->setUpdated();
1313 
1314  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1315 
1316  auto daddr = data_addr;
1317  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1318  : get_element_size(col_type);
1319  data_buffer->getEncoder()->resetChunkStats();
1320  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1321  if (col_type.is_fixlen_array()) {
1322  auto encoder =
1323  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1324  CHECK(encoder);
1325  encoder->updateMetadata((int8_t*)daddr);
1326  } else if (col_type.is_fp()) {
1327  set_chunk_stats(col_type,
1328  daddr,
1329  update_stats_per_thread[ci].new_values_stats.has_null,
1330  update_stats_per_thread[ci].new_values_stats.min_double,
1331  update_stats_per_thread[ci].new_values_stats.max_double);
1332  } else {
1333  set_chunk_stats(col_type,
1334  daddr,
1335  update_stats_per_thread[ci].new_values_stats.has_null,
1336  update_stats_per_thread[ci].new_values_stats.min_int64t,
1337  update_stats_per_thread[ci].new_values_stats.max_int64t);
1338  }
1339  }
1340  };
1341 
1342  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1343  size_t nbytes_var_data_to_keep;
1344  if (nrows_to_keep == 0) {
1345  nbytes_var_data_to_keep = 0;
1346  } else {
1347  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1348  }
1349 
1350  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1351  data_buffer->setSize(nbytes_var_data_to_keep);
1352  data_buffer->setUpdated();
1353 
1354  index_buffer->setSize(sizeof(*index_array) *
1355  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1356  index_buffer->setUpdated();
1357 
1358  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1359  };
1360 
1361  if (is_varlen) {
1362  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1363  } else {
1364  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1365  }
1366  if (threads.size() >= (size_t)cpu_threads()) {
1367  wait_cleanup_threads(threads);
1368  }
1369  }
1370 
1371  wait_cleanup_threads(threads);
1372 
1373  updel_roll.setNumTuple({td, &fragment}, nrows_to_keep);
1374  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1375  auto chunk = chunks[ci];
1376  auto cd = chunk->getColumnDesc();
1377  if (!cd->columnType.is_fixlen_array()) {
1378  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1379  // stored in seconds. Do the metadata conversion here before updating the chunk
1380  // stats.
1381  if (cd->columnType.is_date_in_days()) {
1382  auto& stats = update_stats_per_thread[ci].new_values_stats;
1383  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1384  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1385  }
1387  fragment,
1388  chunk,
1389  update_stats_per_thread[ci].new_values_stats,
1390  cd->columnType,
1391  updel_roll);
1392  }
1393  }
1394 }
1395 
1396 } // namespace Fragmenter_Namespace
1397 
1399  if (nullptr == catalog) {
1400  return false;
1401  }
1402  const auto td = catalog->getMetadataForTable(logicalTableId);
1403  CHECK(td);
1404  ChunkKey chunk_key{catalog->getDatabaseId(), td->tableId};
1405  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1406 
1407  // Checkpoint all shards. Otherwise, epochs can go out of sync.
1409  auto table_epochs = catalog->getTableEpochs(catalog->getDatabaseId(), logicalTableId);
1410  try {
1411  // `checkpointWithAutoRollback` is not called here because, if a failure occurs,
1412  // `dirtyChunks` has to be cleared before resetting epochs
1413  catalog->checkpoint(logicalTableId);
1414  } catch (...) {
1415  dirty_chunks.clear();
1416  catalog->setTableEpochsLogExceptions(catalog->getDatabaseId(), table_epochs);
1417  throw;
1418  }
1419  }
1420  updateFragmenterAndCleanupChunks();
1421  return true;
1422 }
1423 
1425  CHECK(catalog);
1426  auto db_id = catalog->getDatabaseId();
1427  CHECK(table_descriptor);
1428  auto table_id = table_descriptor->tableId;
1430  CHECK_EQ(table_descriptor->persistenceLevel, Data_Namespace::MemoryLevel::DISK_LEVEL);
1431  try {
1432  catalog->getDataMgr().checkpoint(db_id, table_id, memoryLevel);
1433  } catch (...) {
1434  dirty_chunks.clear();
1435  throw;
1436  }
1437  updateFragmenterAndCleanupChunks();
1438 }
1439 
1441  // for each dirty fragment
1442  for (auto& cm : chunk_metadata_map_per_fragment) {
1443  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1444  }
1445 
1446  // flush gpu dirty chunks if update was not on gpu
1447  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1448  for (const auto& [chunk_key, chunk] : dirty_chunks) {
1451  }
1452  }
1453  dirty_chunks.clear();
1454 }
1455 
1457  if (nullptr == catalog) {
1458  return;
1459  }
1460 
1461  // TODO: needed?
1462  ChunkKey chunk_key{catalog->getDatabaseId(), logicalTableId};
1463  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1464  if (is_varlen_update) {
1465  int databaseId = catalog->getDatabaseId();
1466  auto table_epochs = catalog->getTableEpochs(databaseId, logicalTableId);
1467 
1468  dirty_chunks.clear();
1469  catalog->setTableEpochs(databaseId, table_epochs);
1470  } else {
1471  const auto td = catalog->getMetadataForTable(logicalTableId);
1472  CHECK(td);
1473  if (td->persistenceLevel != memoryLevel) {
1474  for (const auto& [chunk_key, chunk] : dirty_chunks) {
1475  catalog->getDataMgr().free(chunk->getBuffer());
1476  chunk->setBuffer(nullptr);
1477  }
1478  }
1479  }
1480 }
1481 
1482 void UpdelRoll::addDirtyChunk(std::shared_ptr<Chunk_NS::Chunk> chunk,
1483  int32_t fragment_id) {
1484  mapd_unique_lock<mapd_shared_mutex> lock(chunk_update_tracker_mutex);
1485  CHECK(catalog);
1486  ChunkKey chunk_key{catalog->getDatabaseId(),
1487  chunk->getColumnDesc()->tableId,
1488  chunk->getColumnDesc()->columnId,
1489  fragment_id};
1490  dirty_chunks[chunk_key] = chunk;
1491 }
1492 
1494  const TableDescriptor* td,
1495  Fragmenter_Namespace::FragmentInfo& fragment_info) {
1496  mapd_unique_lock<mapd_shared_mutex> lock(chunk_update_tracker_mutex);
1497  MetaDataKey key{td, &fragment_info};
1498  if (chunk_metadata_map_per_fragment.count(key) == 0) {
1499  chunk_metadata_map_per_fragment[key] =
1500  fragment_info.getChunkMetadataMapPhysicalCopy();
1501  }
1502  if (num_tuples.count(key) == 0) {
1503  num_tuples[key] = fragment_info.shadowNumTuples;
1504  }
1505 }
1506 
1507 std::shared_ptr<ChunkMetadata> UpdelRoll::getChunkMetadata(
1508  const MetaDataKey& key,
1509  int32_t column_id,
1510  Fragmenter_Namespace::FragmentInfo& fragment_info) {
1511  initializeUnsetMetadata(key.first, fragment_info);
1512  mapd_shared_lock<mapd_shared_mutex> lock(chunk_update_tracker_mutex);
1513  auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1514  CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1515  auto chunk_metadata_it = metadata_map_it->second.find(column_id);
1516  CHECK(chunk_metadata_it != metadata_map_it->second.end());
1517  return chunk_metadata_it->second;
1518 }
1519 
1521  mapd_shared_lock<mapd_shared_mutex> lock(chunk_update_tracker_mutex);
1522  auto metadata_map_it = chunk_metadata_map_per_fragment.find(key);
1523  CHECK(metadata_map_it != chunk_metadata_map_per_fragment.end());
1524  return metadata_map_it->second;
1525 }
1526 
1527 size_t UpdelRoll::getNumTuple(const MetaDataKey& key) const {
1528  mapd_shared_lock<mapd_shared_mutex> lock(chunk_update_tracker_mutex);
1529  auto it = num_tuples.find(key);
1530  CHECK(it != num_tuples.end());
1531  return it->second;
1532 }
1533 
1534 void UpdelRoll::setNumTuple(const MetaDataKey& key, size_t num_tuple) {
1535  mapd_unique_lock<mapd_shared_mutex> lock(chunk_update_tracker_mutex);
1536  num_tuples[key] = num_tuple;
1537 }
std::shared_ptr< Chunk_NS::Chunk > chunk
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:54
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:56
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:109
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int > ChunkKey
Definition: types.h:37
void setNumTuple(const MetaDataKey &key, size_t num_tuple)
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
ChunkMetadataMap getChunkMetadataMapPhysicalCopy() const
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
bool is_varlen_array() const
Definition: sqltypes.h:513
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:40
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:495
void cancelUpdate()
void addDirtyChunk(std::shared_ptr< Chunk_NS::Chunk > chunk, int fragment_id)
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
bool is_fp() const
Definition: sqltypes.h:508
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:99
#define UNREACHABLE()
Definition: Logger.h:253
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:931
std::vector< bool > is_default
Definition: Fragmenter.h:66
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:105
bool g_enable_auto_metadata_update
void updateFragmenterAndCleanupChunks()
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
bool is_time() const
Definition: sqltypes.h:510
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:957
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:208
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:54
future< Result > async(Fn &&fn, Args &&...args)
int64_t get_epoch_seconds_from_days(const int64_t days)
bool commitUpdate()
void stageUpdate()
CONSTEXPR DEVICE bool is_null(const T &value)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:52
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
bool is_integer() const
Definition: sqltypes.h:506
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
ChunkMetadataMap getChunkMetadataMap(const MetaDataKey &key) const
bool is_timeinterval() const
Definition: sqltypes.h:515
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getDatabaseId() const
Definition: Catalog.h:277
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4246
void initializeUnsetMetadata(const TableDescriptor *td, Fragmenter_Namespace::FragmentInfo &fragment_info)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1524
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
bool is_boolean() const
Definition: sqltypes.h:511
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:4260
#define CHECK_LT(x, y)
Definition: Logger.h:219
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:442
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
AbstractBuffer * getBuffer() const
Definition: Chunk.h:107
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:958
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
bool g_enable_experimental_string_functions
Data_Namespace::MemoryLevel persistenceLevel
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:53
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
std::shared_ptr< ChunkMetadata > getChunkMetadata(const MetaDataKey &key, int32_t column_id, Fragmenter_Namespace::FragmentInfo &fragment_info)
size_t getNumTuple(const MetaDataKey &key) const
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3127
#define CHECK(condition)
Definition: Logger.h:209
char * t
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
mapd_unique_lock< mapd_shared_mutex > write_lock
void setTableEpochsLogExceptions(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3164
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:504
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
void free(AbstractBuffer *buffer)
Definition: DataMgr.cpp:475
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
int8_t * numbersPtr
Definition: sqltypes.h:226
unencoded array encoder
void set_minmax(T &min, T &max, T const val)
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:507
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3099
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)