OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <mutex>
18 #include <string>
19 #include <vector>
20 
21 #include <boost/variant.hpp>
22 #include <boost/variant/get.hpp>
23 
24 #include "Catalog/Catalog.h"
28 #include "LockMgr/LockMgr.h"
29 #include "QueryEngine/Execute.h"
30 #include "Shared/DateConverters.h"
32 #include "Shared/thread_count.h"
34 
36 
38 
39 namespace Fragmenter_Namespace {
40 
41 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
42  for (auto& t : threads) {
43  t.get();
44  }
45  threads.clear();
46 }
47 
48 inline bool is_integral(const SQLTypeInfo& t) {
49  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
50 }
51 
53 
55  const TableDescriptor* td,
56  const ColumnDescriptor* cd,
57  const int fragment_id,
58  const std::vector<uint64_t>& frag_offsets,
59  const ScalarTargetValue& rhs_value,
60  const SQLTypeInfo& rhs_type,
61  const Data_Namespace::MemoryLevel memory_level,
62  UpdelRoll& updel_roll) {
63  updateColumn(catalog,
64  td,
65  cd,
66  fragment_id,
67  frag_offsets,
68  std::vector<ScalarTargetValue>(1, rhs_value),
69  rhs_type,
70  memory_level,
71  updel_roll);
72 }
73 
74 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
75  const TableDescriptor* td,
76  const FragmentInfo& fragment,
77  const Data_Namespace::MemoryLevel memory_level,
78  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
79  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
80  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
81  ++nc;
82  if (!cd->isVirtualCol) {
83  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
84  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
85  ChunkKey chunk_key{
86  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
87  auto chunk = Chunk_NS::Chunk::getChunk(cd,
88  &catalog->getDataMgr(),
89  chunk_key,
90  memory_level,
91  0,
92  chunk_meta_it->second->numBytes,
93  chunk_meta_it->second->numElements);
94  chunks.push_back(chunk);
95  }
96  }
97  }
98  return chunks.size();
99 }
100 
102  public:
104 
105  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
106 
107  virtual void addDataBlocksToInsertData(
108  Fragmenter_Namespace::InsertData& insertData) = 0;
109 };
110 
111 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
113  using ColumnDataPtr =
114  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
115 
119  const BUFFER_DATA_TYPE* data_buffer_addr_;
120 
121  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
122  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
123  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
124  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
125  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
126  }
127 
128  ~ScalarChunkConverter() override {}
129 
130  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
131  auto buffer_value = data_buffer_addr_[indexInFragment];
132  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
133  column_data_.get()[row] = insert_value;
134  }
135 
137  DataBlockPtr dataBlock;
138  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
139  insertData.data.push_back(dataBlock);
140  insertData.columnIds.push_back(column_descriptor_->columnId);
141  }
142 };
143 
147 
148  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
151 
152  FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
153  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
154  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
157  }
158 
160 
161  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
162  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
163 
165  src_value_ptr);
166 
167  (*column_data_)[row] = ArrayDatum(
168  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
169  }
170 
172  DataBlockPtr dataBlock;
173  dataBlock.arraysPtr = column_data_.get();
174  insertData.data.push_back(dataBlock);
175  insertData.columnIds.push_back(column_descriptor_->columnId);
176  }
177 };
178 
181 
182  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
183  : FixedLenArrayChunkConverter(num_rows, chunk) {
185  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
186  : nullptr);
187  }
188 
189  ~ArrayChunkConverter() override {}
190 
191  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
192  auto startIndex = index_buffer_addr_[indexInFragment];
193  auto endIndex = index_buffer_addr_[indexInFragment + 1];
194  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
195  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
196  (*column_data_)[row] = ArrayDatum(
197  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
198  }
199 };
200 
204 
205  std::unique_ptr<std::vector<std::string>> column_data_;
206  const int8_t* data_buffer_addr_;
208 
209  StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk* chunk)
210  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
211  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
214  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
215  : nullptr);
216  }
217 
218  ~StringChunkConverter() override {}
219 
220  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
221  size_t src_value_size =
222  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
223  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
224  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
225  }
226 
228  DataBlockPtr dataBlock;
229  dataBlock.stringsPtr = column_data_.get();
230  insertData.data.push_back(dataBlock);
231  insertData.columnIds.push_back(column_descriptor_->columnId);
232  }
233 };
234 
235 template <typename BUFFER_DATA_TYPE>
237  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
238 
242  const BUFFER_DATA_TYPE* data_buffer_addr_;
243 
244  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
245  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
247  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
248  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
249  }
250 
251  ~DateChunkConverter() override {}
252 
253  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
254  auto buffer_value = data_buffer_addr_[indexInFragment];
255  auto insert_value = static_cast<int64_t>(buffer_value);
257  }
258 
260  DataBlockPtr dataBlock;
261  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
262  insertData.data.push_back(dataBlock);
263  insertData.columnIds.push_back(column_descriptor_->columnId);
264  }
265 };
266 
268  const Catalog_Namespace::Catalog* catalog,
269  const TableDescriptor* td,
270  const int fragmentId,
271  const std::vector<TargetMetaInfo> sourceMetaInfo,
272  const std::vector<const ColumnDescriptor*> columnDescriptors,
273  const RowDataProvider& sourceDataProvider,
274  const size_t indexOffFragmentOffsetColumn,
275  const Data_Namespace::MemoryLevel memoryLevel,
276  UpdelRoll& updelRoll,
277  Executor* executor) {
278  updelRoll.is_varlen_update = true;
279  updelRoll.catalog = catalog;
280  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
281  updelRoll.memoryLevel = memoryLevel;
282 
283  size_t num_entries = sourceDataProvider.getEntryCount();
284  size_t num_rows = sourceDataProvider.getRowCount();
285 
286  if (0 == num_rows) {
287  // bail out early
288  return;
289  }
290 
292 
293  auto fragment_ptr = getFragmentInfo(fragmentId);
294  auto& fragment = *fragment_ptr;
295  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
296  get_chunks(catalog, td, fragment, memoryLevel, chunks);
297  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
298  columnDescriptors.size());
299  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
300  size_t indexOfDeletedColumn{0};
301  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
302  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
303  auto chunk = chunks[indexOfChunk];
304  const auto chunk_cd = chunk->getColumnDesc();
305 
306  if (chunk_cd->isDeletedCol) {
307  indexOfDeletedColumn = chunk_cd->columnId;
308  deletedChunk = chunk;
309  continue;
310  }
311 
312  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
313  columnDescriptors.end(),
314  [=](const ColumnDescriptor* cd) -> bool {
315  return cd->columnId == chunk_cd->columnId;
316  });
317 
318  if (targetColumnIt != columnDescriptors.end()) {
319  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
320 
321  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
322  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
323 
325  num_rows,
326  *catalog,
327  sourceDataMetaInfo,
328  targetDescriptor,
329  targetDescriptor->columnType,
330  !targetDescriptor->columnType.get_notnull(),
331  sourceDataProvider.getLiteralDictionary(),
333  ? executor->getStringDictionaryProxy(
334  sourceDataMetaInfo.get_type_info().get_comp_param(),
335  executor->getRowSetMemoryOwner(),
336  true)
337  : nullptr};
338  auto converter = factory.create(param);
339  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
340 
341  if (targetDescriptor->columnType.is_geometry()) {
342  // geometry columns are composites
343  // need to skip chunks, depending on geo type
344  switch (targetDescriptor->columnType.get_type()) {
345  case kMULTIPOLYGON:
346  indexOfChunk += 5;
347  break;
348  case kPOLYGON:
349  indexOfChunk += 4;
350  break;
351  case kLINESTRING:
352  indexOfChunk += 2;
353  break;
354  case kPOINT:
355  indexOfChunk += 1;
356  break;
357  default:
358  CHECK(false); // not supported
359  }
360  }
361  } else {
362  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
363  std::unique_ptr<ChunkToInsertDataConverter> converter;
364 
365  if (chunk_cd->columnType.is_fixlen_array()) {
366  converter =
367  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
368  } else if (chunk_cd->columnType.is_string()) {
369  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
370  } else if (chunk_cd->columnType.is_geometry()) {
371  // the logical geo column is a string column
372  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
373  } else {
374  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
375  }
376 
377  chunkConverters.push_back(std::move(converter));
378 
379  } else if (chunk_cd->columnType.is_date_in_days()) {
380  /* Q: Why do we need this?
381  A: In variable length updates path we move the chunk content of column
382  without decoding. Since it again passes through DateDaysEncoder
383  the expected value should be in seconds, but here it will be in days.
384  Therefore, using DateChunkConverter chunk values are being scaled to
385  seconds which then ultimately encoded in days in DateDaysEncoder.
386  */
387  std::unique_ptr<ChunkToInsertDataConverter> converter;
388  const size_t physical_size = chunk_cd->columnType.get_size();
389  if (physical_size == 2) {
390  converter =
391  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
392  } else if (physical_size == 4) {
393  converter =
394  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
395  } else {
396  CHECK(false);
397  }
398  chunkConverters.push_back(std::move(converter));
399  } else {
400  std::unique_ptr<ChunkToInsertDataConverter> converter;
401  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
402  int logical_size = logical_type.get_size();
403  int physical_size = chunk_cd->columnType.get_size();
404 
405  if (logical_type.is_string()) {
406  // for dicts -> logical = physical
407  logical_size = physical_size;
408  }
409 
410  if (8 == physical_size) {
411  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
412  num_rows, chunk.get());
413  } else if (4 == physical_size) {
414  if (8 == logical_size) {
415  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
416  num_rows, chunk.get());
417  } else {
418  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
419  num_rows, chunk.get());
420  }
421  } else if (2 == chunk_cd->columnType.get_size()) {
422  if (8 == logical_size) {
423  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
424  num_rows, chunk.get());
425  } else if (4 == logical_size) {
426  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
427  num_rows, chunk.get());
428  } else {
429  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
430  num_rows, chunk.get());
431  }
432  } else if (1 == chunk_cd->columnType.get_size()) {
433  if (8 == logical_size) {
434  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
435  num_rows, chunk.get());
436  } else if (4 == logical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
438  num_rows, chunk.get());
439  } else if (2 == logical_size) {
440  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
441  num_rows, chunk.get());
442  } else {
443  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
444  num_rows, chunk.get());
445  }
446  } else {
447  CHECK(false); // unknown
448  }
449 
450  chunkConverters.push_back(std::move(converter));
451  }
452  }
453  }
454 
455  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
456  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
457 
458  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
459  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
460  deletedChunk->getColumnDesc()->tableId,
461  deletedChunk->getColumnDesc()->columnId,
462  fragment.fragmentId};
463  updelRoll.dirtyChunkeys.insert(chunkey);
464  bool* deletedChunkBuffer =
465  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
466 
467  std::atomic<size_t> row_idx{0};
468 
469  auto row_converter = [&sourceDataProvider,
470  &sourceDataConverters,
471  &indexOffFragmentOffsetColumn,
472  &chunkConverters,
473  &deletedChunkBuffer,
474  &row_idx](size_t indexOfEntry) -> void {
475  // convert the source data
476  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
477  if (row.empty()) {
478  return;
479  }
480 
481  size_t indexOfRow = row_idx.fetch_add(1);
482 
483  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
484  if (sourceDataConverters[col]) {
485  const auto& mapd_variant = row[col];
486  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
487  }
488  }
489 
490  auto scalar = checked_get(
491  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
492  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
493 
494  // convert the remaining chunks
495  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
496  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
497  }
498 
499  // now mark the row as deleted
500  deletedChunkBuffer[indexInChunkBuffer] = true;
501  };
502 
503  bool can_go_parallel = num_rows > 20000;
504 
505  if (can_go_parallel) {
506  const size_t num_worker_threads = cpu_threads();
507  std::vector<std::future<void>> worker_threads;
508  for (size_t i = 0,
509  start_entry = 0,
510  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
511  i < num_worker_threads && start_entry < num_entries;
512  ++i, start_entry += stride) {
513  const auto end_entry = std::min(start_entry + stride, num_rows);
514  worker_threads.push_back(std::async(
515  std::launch::async,
516  [&row_converter](const size_t start, const size_t end) {
517  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
518  row_converter(indexOfRow);
519  }
520  },
521  start_entry,
522  end_entry));
523  }
524 
525  for (auto& child : worker_threads) {
526  child.wait();
527  }
528 
529  } else {
530  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
531  row_converter(entryIdx);
532  }
533  }
534 
536  insert_data.databaseId = catalog->getCurrentDB().dbId;
537  insert_data.tableId = td->tableId;
538 
539  for (size_t i = 0; i < chunkConverters.size(); i++) {
540  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
541  continue;
542  }
543 
544  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
545  if (sourceDataConverters[i]) {
546  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
547  }
548  continue;
549  }
550 
551  insert_data.numRows = num_rows;
552  insert_data.is_default.resize(insert_data.columnIds.size(), false);
553  insertDataNoCheckpoint(insert_data);
554 
555  // update metdata for deleted chunk as we are doing special handling
556  auto chunk_meta_it = fragment.getChunkMetadataMap().find(indexOfDeletedColumn);
557  CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
558  chunk_meta_it->second->chunkStats.max.boolval = 1;
559 
560  // Im not completely sure that we need to do this in fragmented and on the buffer
561  // but leaving this alone for now
562  if (!deletedChunk->getBuffer()->hasEncoder()) {
563  deletedChunk->initEncoder();
564  }
565  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
566 
567  if (fragment.shadowNumTuples > deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
568  // An append to the same fragment will increase shadowNumTuples.
569  // Update NumElems in this case. Otherwise, use existing NumElems.
570  deletedChunk->getBuffer()->getEncoder()->setNumElems(fragment.shadowNumTuples);
571  }
572  deletedChunk->getBuffer()->setUpdated();
573 }
574 
575 namespace {
576 inline void update_metadata(SQLTypeInfo const& ti,
578  int64_t const updated_val,
579  int64_t const old_val,
580  NullSentinelSupplier s = NullSentinelSupplier()) {
581  if (ti.get_notnull()) {
583  update_stats.new_values_stats.max_int64t,
584  updated_val);
586  update_stats.old_values_stats.max_int64t,
587  old_val);
588  } else {
590  update_stats.new_values_stats.max_int64t,
591  update_stats.new_values_stats.has_null,
592  updated_val,
593  s(ti, updated_val));
595  update_stats.old_values_stats.max_int64t,
596  update_stats.old_values_stats.has_null,
597  old_val,
598  s(ti, old_val));
599  }
600 }
601 
602 inline void update_metadata(SQLTypeInfo const& ti,
603  ChunkUpdateStats& update_stats,
604  double const updated_val,
605  double const old_val,
606  NullSentinelSupplier s = NullSentinelSupplier()) {
607  if (ti.get_notnull()) {
609  update_stats.new_values_stats.max_double,
610  updated_val);
612  update_stats.old_values_stats.max_double,
613  old_val);
614  } else {
616  update_stats.new_values_stats.max_double,
617  update_stats.new_values_stats.has_null,
618  updated_val,
619  s(ti, updated_val));
621  update_stats.old_values_stats.max_double,
622  update_stats.old_values_stats.has_null,
623  old_val,
624  s(ti, old_val));
625  }
626 }
627 
628 inline void update_metadata(UpdateValuesStats& agg_stats,
629  const UpdateValuesStats& new_stats) {
630  agg_stats.has_null = agg_stats.has_null || new_stats.has_null;
631  agg_stats.max_double = std::max<double>(agg_stats.max_double, new_stats.max_double);
632  agg_stats.min_double = std::min<double>(agg_stats.min_double, new_stats.min_double);
633  agg_stats.max_int64t = std::max<int64_t>(agg_stats.max_int64t, new_stats.max_int64t);
634  agg_stats.min_int64t = std::min<int64_t>(agg_stats.min_int64t, new_stats.min_int64t);
635 }
636 } // namespace
637 
638 std::optional<ChunkUpdateStats> InsertOrderFragmenter::updateColumn(
639  const Catalog_Namespace::Catalog* catalog,
640  const TableDescriptor* td,
641  const ColumnDescriptor* cd,
642  const int fragment_id,
643  const std::vector<uint64_t>& frag_offsets,
644  const std::vector<ScalarTargetValue>& rhs_values,
645  const SQLTypeInfo& rhs_type,
646  const Data_Namespace::MemoryLevel memory_level,
647  UpdelRoll& updel_roll) {
648  updel_roll.catalog = catalog;
649  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
650  updel_roll.memoryLevel = memory_level;
651 
652  const size_t ncore = cpu_threads();
653  const auto nrow = frag_offsets.size();
654  const auto n_rhs_values = rhs_values.size();
655  if (0 == nrow) {
656  return {};
657  }
658  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
659 
660  auto fragment_ptr = getFragmentInfo(fragment_id);
661  auto& fragment = *fragment_ptr;
662  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
663  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
664  ChunkKey chunk_key{
665  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
666  auto chunk = Chunk_NS::Chunk::getChunk(cd,
667  &catalog->getDataMgr(),
668  chunk_key,
670  0,
671  chunk_meta_it->second->numBytes,
672  chunk_meta_it->second->numElements);
673 
674  std::vector<ChunkUpdateStats> update_stats_per_thread(ncore);
675 
676  // parallel update elements
677  std::vector<std::future<void>> threads;
678 
679  const auto segsz = (nrow + ncore - 1) / ncore;
680  auto dbuf = chunk->getBuffer();
681  auto dbuf_addr = dbuf->getMemoryPtr();
682  dbuf->setUpdated();
683  {
684  std::lock_guard<std::mutex> lck(updel_roll.mutex);
685  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
686  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
687  }
688 
689  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
690  cd->tableId,
691  cd->columnId,
692  fragment.fragmentId};
693  updel_roll.dirtyChunkeys.insert(chunkey);
694  }
695  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
696  threads.emplace_back(std::async(
697  std::launch::async, [=, &update_stats_per_thread, &frag_offsets, &rhs_values] {
698  SQLTypeInfo lhs_type = cd->columnType;
699 
700  // !! not sure if this is a undocumented convention or a bug, but for a sharded
701  // table the dictionary id of a encoded string column is not specified by
702  // comp_param in physical table but somehow in logical table :) comp_param in
703  // physical table is always 0, so need to adapt accordingly...
704  auto cdl = (shard_ < 0)
705  ? cd
706  : catalog->getMetadataForColumn(
707  catalog->getLogicalTableId(td->tableId), cd->columnId);
708  CHECK(cdl);
709  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
710  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
711  lhs_type, &decimalOverflowValidator);
712  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
713  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
714  lhs_type, &dateDaysOverflowValidator);
715 
716  StringDictionary* stringDict{nullptr};
717  if (lhs_type.is_string()) {
718  CHECK(kENCODING_DICT == lhs_type.get_compression());
719  auto dictDesc = const_cast<DictDescriptor*>(
720  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
721  CHECK(dictDesc);
722  stringDict = dictDesc->stringDict.get();
723  CHECK(stringDict);
724  }
725 
726  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
727  const auto roffs = frag_offsets[r];
728  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
729  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
730  ScalarTargetValue sv2;
731 
732  // Subtle here is on the two cases of string-to-string assignments, when
733  // upstream passes RHS string as a string index instead of a preferred "real
734  // string".
735  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
736  // index
737  // in this layer, so if upstream passes a str idx here, an
738  // exception is thrown.
739  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
740  // str idx.
741  if (rhs_type.is_string()) {
742  if (const auto vp = boost::get<int64_t>(sv)) {
743  auto dictDesc = const_cast<DictDescriptor*>(
744  catalog->getMetadataForDict(rhs_type.get_comp_param()));
745  if (nullptr == dictDesc) {
746  throw std::runtime_error(
747  "UPDATE does not support cast from string literal to string "
748  "column.");
749  }
750  auto stringDict = dictDesc->stringDict.get();
751  CHECK(stringDict);
752  sv2 = NullableString(stringDict->getString(*vp));
753  sv = &sv2;
754  }
755  }
756 
757  if (const auto vp = boost::get<int64_t>(sv)) {
758  auto v = *vp;
759  if (lhs_type.is_string()) {
760  throw std::runtime_error("UPDATE does not support cast to string.");
761  }
762  int64_t old_val;
763  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
764  // Handle special case where date column with date in days encoding stores
765  // metadata in epoch seconds.
766  if (lhs_type.is_date_in_days()) {
768  }
769  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
770  if (lhs_type.is_decimal()) {
771  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
772  int64_t decimal_val;
773  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
774  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
775  lhs_type.get_notnull() == false)
776  ? v
777  : decimal_val;
779  lhs_type, update_stats_per_thread[c], target_value, old_val);
780  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
781  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
782  if (positive_v_and_negative_d || negative_v_and_positive_d) {
783  throw std::runtime_error(
784  "Data conversion overflow on " + std::to_string(v) +
785  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
786  std::to_string(rhs_type.get_scale()) + ") to (" +
787  std::to_string(lhs_type.get_dimension()) + ", " +
788  std::to_string(lhs_type.get_scale()) + ")");
789  }
790  } else if (is_integral(lhs_type)) {
791  if (lhs_type.is_date_in_days()) {
792  // Store meta values in seconds
793  if (lhs_type.get_size() == 2) {
794  nullAwareDateOverflowValidator.validate<int16_t>(v);
795  } else {
796  nullAwareDateOverflowValidator.validate<int32_t>(v);
797  }
798  int64_t days;
799  get_scalar<int64_t>(data_ptr, lhs_type, days);
800  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
801  int64_t target_value = (v == inline_int_null_value<int64_t>() &&
802  lhs_type.get_notnull() == false)
803  ? NullSentinelSupplier()(lhs_type, v)
804  : seconds;
806  lhs_type, update_stats_per_thread[c], target_value, old_val);
807  } else {
808  int64_t target_value;
809  if (rhs_type.is_decimal()) {
810  target_value = round(decimal_to_double(rhs_type, v));
811  } else {
812  target_value = v;
813  }
815  lhs_type, update_stats_per_thread[c], target_value, old_val);
816  }
817  } else {
818  if (rhs_type.is_decimal()) {
819  update_metadata(lhs_type,
820  update_stats_per_thread[c],
821  decimal_to_double(rhs_type, v),
822  double(old_val));
823  } else {
824  update_metadata(lhs_type, update_stats_per_thread[c], v, old_val);
825  }
826  }
827  } else if (const auto vp = boost::get<double>(sv)) {
828  auto v = *vp;
829  if (lhs_type.is_string()) {
830  throw std::runtime_error("UPDATE does not support cast to string.");
831  }
832  double old_val;
833  get_scalar<double>(data_ptr, lhs_type, old_val);
834  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
835  if (lhs_type.is_integer()) {
837  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
838  } else if (lhs_type.is_fp()) {
840  lhs_type, update_stats_per_thread[c], double(v), double(old_val));
841  } else {
842  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
843  "LHS with a floating RHS.";
844  }
845  } else if (const auto vp = boost::get<float>(sv)) {
846  auto v = *vp;
847  if (lhs_type.is_string()) {
848  throw std::runtime_error("UPDATE does not support cast to string.");
849  }
850  float old_val;
851  get_scalar<float>(data_ptr, lhs_type, old_val);
852  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
853  if (lhs_type.is_integer()) {
855  lhs_type, update_stats_per_thread[c], int64_t(v), int64_t(old_val));
856  } else {
857  update_metadata(lhs_type, update_stats_per_thread[c], double(v), old_val);
858  }
859  } else if (const auto vp = boost::get<NullableString>(sv)) {
860  const auto s = boost::get<std::string>(vp);
861  const auto sval = s ? *s : std::string("");
862  if (lhs_type.is_string()) {
863  decltype(stringDict->getOrAdd(sval)) sidx;
864  {
865  std::unique_lock<std::mutex> lock(temp_mutex_);
866  sidx = stringDict->getOrAdd(sval);
867  }
868  int64_t old_val;
869  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
870  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
872  lhs_type, update_stats_per_thread[c], int64_t(sidx), old_val);
873  } else if (sval.size() > 0) {
874  auto dval = std::atof(sval.data());
875  if (lhs_type.is_boolean()) {
876  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
877  } else if (lhs_type.is_time()) {
878  throw std::runtime_error(
879  "Date/Time/Timestamp update not supported through translated "
880  "string path.");
881  }
882  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
883  double old_val;
884  get_scalar<double>(data_ptr, lhs_type, old_val);
885  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
887  lhs_type, update_stats_per_thread[c], double(dval), old_val);
888  } else {
889  int64_t old_val;
890  get_scalar<int64_t>(data_ptr, lhs_type, old_val);
891  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
893  lhs_type, update_stats_per_thread[c], int64_t(dval), old_val);
894  }
895  } else {
896  put_null(data_ptr, lhs_type, cd->columnName);
897  update_stats_per_thread[c].new_values_stats.has_null = true;
898  }
899  } else {
900  CHECK(false);
901  }
902  }
903  }));
904  if (threads.size() >= (size_t)cpu_threads()) {
905  wait_cleanup_threads(threads);
906  }
907  }
908  wait_cleanup_threads(threads);
909 
910  // for unit test
912  if (cd->isDeletedCol) {
913  const auto deleted_offsets = getVacuumOffsets(chunk);
914  if (deleted_offsets.size() > 0) {
915  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
916  return {};
917  }
918  }
919  }
921  for (size_t c = 0; c < ncore; ++c) {
922  update_metadata(update_stats.new_values_stats,
923  update_stats_per_thread[c].new_values_stats);
924  update_metadata(update_stats.old_values_stats,
925  update_stats_per_thread[c].old_values_stats);
926  }
927 
928  CHECK_GT(fragment.shadowNumTuples, size_t(0));
930  cd, fragment, chunk, update_stats.new_values_stats, cd->columnType, updel_roll);
931  update_stats.updated_rows_count = nrow;
932  update_stats.fragment_rows_count = fragment.shadowNumTuples;
933  update_stats.chunk = chunk;
934  return update_stats;
935 }
936 
938  const ColumnDescriptor* cd,
939  FragmentInfo& fragment,
940  std::shared_ptr<Chunk_NS::Chunk> chunk,
941  const UpdateValuesStats& new_values_stats,
942  const SQLTypeInfo& rhs_type,
943  UpdelRoll& updel_roll) {
944  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
945  auto key = std::make_pair(td, &fragment);
946  std::lock_guard<std::mutex> lck(updel_roll.mutex);
947  mapd_unique_lock<mapd_shared_mutex> write_lock(fragmentInfoMutex_);
948  if (0 == updel_roll.chunkMetadata.count(key)) {
949  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
950  }
951  if (0 == updel_roll.numTuples.count(key)) {
952  updel_roll.numTuples[key] = fragment.shadowNumTuples;
953  }
954  // at this point we are just looking at a reference to the real active metadata it is
955  // unsafe to do operations on this we need this to be a copy?
956  auto& chunkMetadata = updel_roll.chunkMetadata[key];
957 
958  auto buffer = chunk->getBuffer();
959  const auto& lhs_type = cd->columnType;
960 
961  auto encoder = buffer->getEncoder();
962  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
963  static_assert(std::is_same<decltype(min), decltype(max)>::value,
964  "Type mismatch on min/max");
965  if (has_null) {
966  encoder->updateStats(decltype(min)(), true);
967  }
968  if (max < min) {
969  return;
970  }
971  encoder->updateStats(min, false);
972  encoder->updateStats(max, false);
973  };
974 
975  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
976  update_stats(new_values_stats.min_int64t,
977  new_values_stats.max_int64t,
978  new_values_stats.has_null);
979  } else if (lhs_type.is_fp()) {
980  update_stats(new_values_stats.min_double,
981  new_values_stats.max_double,
982  new_values_stats.has_null);
983  } else if (lhs_type.is_decimal()) {
984  update_stats((int64_t)(new_values_stats.min_double * pow(10, lhs_type.get_scale())),
985  (int64_t)(new_values_stats.max_double * pow(10, lhs_type.get_scale())),
986  new_values_stats.has_null);
987  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
988  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
989  update_stats(new_values_stats.min_int64t,
990  new_values_stats.max_int64t,
991  new_values_stats.has_null);
992  }
993  buffer->getEncoder()->getMetadata(chunkMetadata[cd->columnId]);
994 }
995 
997  const MetaDataKey& key,
998  UpdelRoll& updel_roll) {
999  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
1000  if (updel_roll.chunkMetadata.count(key)) {
1001  auto& fragmentInfo = *key.second;
1002  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
1003  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
1004  fragmentInfo.setChunkMetadataMap(chunkMetadata);
1005  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
1006  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
1007  }
1008 }
1009 
1011  const TableDescriptor* td,
1012  const FragmentInfo& fragment,
1013  const Data_Namespace::MemoryLevel memory_level) {
1014  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
1015  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1016  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1017  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1018  ++ncol;
1019  if (!cd->isVirtualCol) {
1020  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1021  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1022  ChunkKey chunk_key{
1023  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1024  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1025  &catalog_->getDataMgr(),
1026  chunk_key,
1027  memory_level,
1028  0,
1029  chunk_meta_it->second->numBytes,
1030  chunk_meta_it->second->numElements);
1031  chunks.push_back(chunk);
1032  }
1033  }
1034  }
1035  return chunks;
1036 }
1037 
1038 // get a sorted vector of offsets of rows to vacuum
1039 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1040  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1041  const auto data_buffer = chunk->getBuffer();
1042  const auto data_addr = data_buffer->getMemoryPtr();
1043  const size_t nrows_in_chunk = data_buffer->size();
1044  const size_t ncore = cpu_threads();
1045  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1046  std::vector<std::vector<uint64_t>> deleted_offsets;
1047  deleted_offsets.resize(ncore);
1048  std::vector<std::future<void>> threads;
1049  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1050  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1051  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1052  const auto ithread = rbegin / segsz;
1053  CHECK(ithread < deleted_offsets.size());
1054  deleted_offsets[ithread].reserve(segsz);
1055  for (size_t r = rbegin; r < rend; ++r) {
1056  if (data_addr[r]) {
1057  deleted_offsets[ithread].push_back(r);
1058  }
1059  }
1060  }));
1061  }
1062  wait_cleanup_threads(threads);
1063  std::vector<uint64_t> all_deleted_offsets;
1064  for (size_t i = 0; i < ncore; ++i) {
1065  all_deleted_offsets.insert(
1066  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1067  }
1068  return all_deleted_offsets;
1069 }
1070 
1071 template <typename T>
1072 static void set_chunk_stats(const SQLTypeInfo& col_type,
1073  int8_t* data_addr,
1074  bool& has_null,
1075  T& min,
1076  T& max) {
1077  T v;
1078  const auto can_be_null = !col_type.get_notnull();
1079  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1080  if (is_null) {
1081  has_null = has_null || (can_be_null && is_null);
1082  } else {
1083  set_minmax(min, max, v);
1084  }
1085 }
1086 
1088  FragmentInfo& fragment,
1089  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1090  const size_t nrows_to_keep,
1091  UpdelRoll& updel_roll) {
1092  auto cd = chunk->getColumnDesc();
1093  auto td = catalog->getMetadataForTable(cd->tableId);
1094  auto data_buffer = chunk->getBuffer();
1095  std::lock_guard<std::mutex> lck(updel_roll.mutex);
1096  const auto key = std::make_pair(td, &fragment);
1097  if (0 == updel_roll.chunkMetadata.count(key)) {
1098  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
1099  }
1100  auto& chunkMetadata = updel_roll.chunkMetadata[key];
1101  chunkMetadata[cd->columnId]->numElements = nrows_to_keep;
1102  chunkMetadata[cd->columnId]->numBytes = data_buffer->size();
1103  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
1104  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
1105  ChunkKey chunk_key{
1106  catalog->getDatabaseId(), cd->tableId, cd->columnId, fragment.fragmentId};
1107  updel_roll.dirtyChunkeys.emplace(chunk_key);
1108  }
1109 }
1110 
1112  const FragmentInfo& fragment,
1113  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1114  const std::vector<uint64_t>& frag_offsets) {
1115  const auto cd = chunk->getColumnDesc();
1116  const auto& col_type = cd->columnType;
1117  auto data_buffer = chunk->getBuffer();
1118  auto data_addr = data_buffer->getMemoryPtr();
1119  auto element_size =
1120  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1121  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1122  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1123  size_t nbytes_fix_data_to_keep = 0;
1124  auto nrows_to_vacuum = frag_offsets.size();
1125  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1126  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1127  auto is_last_one = irow == nrows_to_vacuum;
1128  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1129  auto maddr_to_vacuum = data_addr;
1130  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1131  if (nrows_to_keep > 0) {
1132  auto nbytes_to_keep = nrows_to_keep * element_size;
1133  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1134  // move curr fixlen row block toward front
1135  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1136  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1137  nbytes_to_keep);
1138  }
1139  irow_of_blk_to_fill += nrows_to_keep;
1140  nbytes_fix_data_to_keep += nbytes_to_keep;
1141  }
1142  irow_of_blk_to_keep = irow_to_vacuum + 1;
1143  }
1144  return nbytes_fix_data_to_keep;
1145 }
1146 
1147 // Gets the initial padding required for the chunk buffer. For variable length array
1148 // columns, if the first element after vacuuming is going to be a null array, a padding
1149 // with a value that is greater than 0 is expected.
1150 size_t get_null_padding(bool is_varlen_array,
1151  const std::vector<uint64_t>& frag_offsets,
1152  const StringOffsetT* index_array,
1153  size_t fragment_row_count) {
1154  if (is_varlen_array) {
1155  size_t first_non_deleted_row_index{0};
1156  for (auto deleted_offset : frag_offsets) {
1157  if (first_non_deleted_row_index < deleted_offset) {
1158  break;
1159  } else {
1160  first_non_deleted_row_index++;
1161  }
1162  }
1163  CHECK_LT(first_non_deleted_row_index, fragment_row_count);
1164  if (first_non_deleted_row_index == 0) {
1165  // If the first row in the fragment is not deleted, then the first offset in the
1166  // index buffer/array already contains expected padding.
1167  return index_array[0];
1168  } else {
1169  // If the first non-deleted element is a null array (indentified by a negative
1170  // offset), get a padding value for the chunk buffer.
1171  if (index_array[first_non_deleted_row_index + 1] < 0) {
1172  size_t first_non_zero_offset{0};
1173  for (size_t i = 0; i <= first_non_deleted_row_index; i++) {
1174  if (index_array[i] != 0) {
1175  first_non_zero_offset = index_array[i];
1176  break;
1177  }
1178  }
1179  CHECK_GT(first_non_zero_offset, static_cast<size_t>(0));
1181  first_non_zero_offset);
1182  } else {
1183  return 0;
1184  }
1185  }
1186  } else {
1187  return 0;
1188  }
1189 }
1190 
1191 // Gets the indexes of variable length null arrays in the chunk after vacuuming.
1192 std::set<size_t> get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info,
1193  const std::vector<uint64_t>& frag_offsets,
1194  const StringOffsetT* index_array,
1195  size_t fragment_row_count) {
1196  std::set<size_t> null_array_indexes;
1197  if (sql_type_info.is_varlen_array() && !sql_type_info.get_notnull()) {
1198  size_t frag_offset_index{0};
1199  size_t vacuum_offset{0};
1200  for (size_t i = 0; i < fragment_row_count; i++) {
1201  if (frag_offset_index < frag_offsets.size() &&
1202  i == frag_offsets[frag_offset_index]) {
1203  frag_offset_index++;
1204  vacuum_offset++;
1205  } else if (index_array[i + 1] < 0) {
1206  null_array_indexes.emplace(i - vacuum_offset);
1207  }
1208  }
1209  }
1210  return null_array_indexes;
1211 }
1212 
1213 StringOffsetT get_buffer_offset(bool is_varlen_array,
1214  const StringOffsetT* index_array,
1215  size_t index) {
1216  auto offset = index_array[index];
1217  if (offset < 0) {
1218  // Variable length arrays encode null arrays as negative offsets
1219  CHECK(is_varlen_array);
1220  offset = -offset;
1221  }
1222  return offset;
1223 }
1224 
1226  const FragmentInfo& fragment,
1227  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1228  const std::vector<uint64_t>& frag_offsets) {
1229  auto is_varlen_array = chunk->getColumnDesc()->columnType.is_varlen_array();
1230  auto data_buffer = chunk->getBuffer();
1231  CHECK(data_buffer);
1232  auto index_buffer = chunk->getIndexBuf();
1233  CHECK(index_buffer);
1234  auto data_addr = data_buffer->getMemoryPtr();
1235  auto indices_addr = index_buffer->getMemoryPtr();
1236  CHECK(indices_addr);
1237  auto index_array = (StringOffsetT*)indices_addr;
1238  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1239  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1240  size_t nbytes_fix_data_to_keep = 0;
1241  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1242  size_t null_padding =
1243  get_null_padding(is_varlen_array, frag_offsets, index_array, nrows_in_fragment);
1244  size_t nbytes_var_data_to_keep = null_padding;
1245  auto null_array_indexes = get_var_len_null_array_indexes(
1246  chunk->getColumnDesc()->columnType, frag_offsets, index_array, nrows_in_fragment);
1247  auto nrows_to_vacuum = frag_offsets.size();
1248  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1249  auto is_last_one = irow == nrows_to_vacuum;
1250  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1251  auto maddr_to_vacuum = data_addr;
1252  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1253  if (nrows_to_keep > 0) {
1254  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1255  auto deleted_row_start_offset =
1256  get_buffer_offset(is_varlen_array, index_array, irow_to_vacuum);
1257  auto kept_row_start_offset =
1258  get_buffer_offset(is_varlen_array, index_array, irow_of_blk_to_keep);
1259  auto nbytes_to_keep =
1260  (is_last_one ? data_buffer->size() : deleted_row_start_offset) -
1261  kept_row_start_offset;
1262  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1263  if (nbytes_to_keep > 0) {
1264  CHECK(data_addr);
1265  // move curr varlen row block toward front
1266  memmove(data_addr + ibyte_var_data_to_keep,
1267  data_addr + kept_row_start_offset,
1268  nbytes_to_keep);
1269  }
1270 
1271  const auto base_offset = kept_row_start_offset;
1272  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1273  auto update_index = irow_of_blk_to_keep + i;
1274  auto offset = get_buffer_offset(is_varlen_array, index_array, update_index);
1275  index_array[update_index] = ibyte_var_data_to_keep + (offset - base_offset);
1276  }
1277  }
1278  nbytes_var_data_to_keep += nbytes_to_keep;
1279  maddr_to_vacuum = indices_addr;
1280 
1281  constexpr static auto index_element_size = sizeof(StringOffsetT);
1282  nbytes_to_keep = nrows_to_keep * index_element_size;
1283  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1284  // move curr fixlen row block toward front
1285  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1286  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1287  nbytes_to_keep);
1288  }
1289  irow_of_blk_to_fill += nrows_to_keep;
1290  nbytes_fix_data_to_keep += nbytes_to_keep;
1291  }
1292  irow_of_blk_to_keep = irow_to_vacuum + 1;
1293  }
1294 
1295  // Set expected null padding, last offset, and negative values for null array offsets.
1296  index_array[0] = null_padding;
1297  auto post_vacuum_row_count = nrows_in_fragment - nrows_to_vacuum;
1298  index_array[post_vacuum_row_count] = nbytes_var_data_to_keep;
1299  if (!is_varlen_array) {
1300  CHECK(null_array_indexes.empty());
1301  }
1302  for (auto index : null_array_indexes) {
1303  index_array[index + 1] = -1 * std::abs(index_array[index + 1]);
1304  }
1305  return nbytes_var_data_to_keep;
1306 }
1307 
1309  const TableDescriptor* td,
1310  const int fragment_id,
1311  const std::vector<uint64_t>& frag_offsets,
1312  const Data_Namespace::MemoryLevel memory_level,
1313  UpdelRoll& updel_roll) {
1314  auto fragment_ptr = getFragmentInfo(fragment_id);
1315  auto& fragment = *fragment_ptr;
1316  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1317  const auto ncol = chunks.size();
1318 
1319  std::vector<ChunkUpdateStats> update_stats_per_thread(ncol);
1320 
1321  // parallel delete columns
1322  std::vector<std::future<void>> threads;
1323  auto nrows_to_vacuum = frag_offsets.size();
1324  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1325  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1326 
1327  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1328  auto chunk = chunks[ci];
1329  const auto cd = chunk->getColumnDesc();
1330  const auto& col_type = cd->columnType;
1331  auto data_buffer = chunk->getBuffer();
1332  auto index_buffer = chunk->getIndexBuf();
1333  auto data_addr = data_buffer->getMemoryPtr();
1334  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1335  auto index_array = (StringOffsetT*)indices_addr;
1336  bool is_varlen = col_type.is_varlen_indeed();
1337 
1338  auto fixlen_vacuum =
1339  [=, &update_stats_per_thread, &updel_roll, &frag_offsets, &fragment] {
1340  size_t nbytes_fix_data_to_keep;
1341  if (nrows_to_keep == 0) {
1342  nbytes_fix_data_to_keep = 0;
1343  } else {
1344  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1345  }
1346 
1347  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1348  data_buffer->setSize(nbytes_fix_data_to_keep);
1349  data_buffer->setUpdated();
1350 
1351  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1352 
1353  auto daddr = data_addr;
1354  auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
1355  : get_element_size(col_type);
1356  data_buffer->getEncoder()->resetChunkStats();
1357  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1358  if (col_type.is_fixlen_array()) {
1359  auto encoder =
1360  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1361  CHECK(encoder);
1362  encoder->updateMetadata((int8_t*)daddr);
1363  } else if (col_type.is_fp()) {
1364  set_chunk_stats(col_type,
1365  daddr,
1366  update_stats_per_thread[ci].new_values_stats.has_null,
1367  update_stats_per_thread[ci].new_values_stats.min_double,
1368  update_stats_per_thread[ci].new_values_stats.max_double);
1369  } else {
1370  set_chunk_stats(col_type,
1371  daddr,
1372  update_stats_per_thread[ci].new_values_stats.has_null,
1373  update_stats_per_thread[ci].new_values_stats.min_int64t,
1374  update_stats_per_thread[ci].new_values_stats.max_int64t);
1375  }
1376  }
1377  };
1378 
1379  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1380  size_t nbytes_var_data_to_keep;
1381  if (nrows_to_keep == 0) {
1382  nbytes_var_data_to_keep = 0;
1383  } else {
1384  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1385  }
1386 
1387  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1388  data_buffer->setSize(nbytes_var_data_to_keep);
1389  data_buffer->setUpdated();
1390 
1391  index_buffer->setSize(sizeof(*index_array) *
1392  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1393  index_buffer->setUpdated();
1394 
1395  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1396  };
1397 
1398  if (is_varlen) {
1399  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1400  } else {
1401  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1402  }
1403  if (threads.size() >= (size_t)cpu_threads()) {
1404  wait_cleanup_threads(threads);
1405  }
1406  }
1407 
1408  wait_cleanup_threads(threads);
1409 
1410  auto key = std::make_pair(td, &fragment);
1411  updel_roll.numTuples[key] = nrows_to_keep;
1412  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1413  auto chunk = chunks[ci];
1414  auto cd = chunk->getColumnDesc();
1415  if (!cd->columnType.is_fixlen_array()) {
1416  // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
1417  // stored in seconds. Do the metadata conversion here before updating the chunk
1418  // stats.
1419  if (cd->columnType.is_date_in_days()) {
1420  auto& stats = update_stats_per_thread[ci].new_values_stats;
1421  stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
1422  stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
1423  }
1425  fragment,
1426  chunk,
1427  update_stats_per_thread[ci].new_values_stats,
1428  cd->columnType,
1429  updel_roll);
1430  }
1431  }
1432 }
1433 
1434 } // namespace Fragmenter_Namespace
1435 
1437  if (nullptr == catalog) {
1438  return false;
1439  }
1440  const auto td = catalog->getMetadataForTable(logicalTableId);
1441  CHECK(td);
1442  ChunkKey chunk_key{catalog->getDatabaseId(), td->tableId};
1443  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1444 
1445  // Checkpoint all shards. Otherwise, epochs can go out of sync.
1447  auto table_epochs = catalog->getTableEpochs(catalog->getDatabaseId(), logicalTableId);
1448  try {
1449  // `checkpointWithAutoRollback` is not called here because, if a failure occurs,
1450  // `dirtyChunks` has to be cleared before resetting epochs
1451  catalog->checkpoint(logicalTableId);
1452  } catch (...) {
1453  dirtyChunks.clear();
1454  catalog->setTableEpochsLogExceptions(catalog->getDatabaseId(), table_epochs);
1455  throw;
1456  }
1457  }
1458  updateFragmenterAndCleanupChunks();
1459  return true;
1460 }
1461 
1463  CHECK(catalog);
1464  auto db_id = catalog->getDatabaseId();
1465  CHECK(table_descriptor);
1466  auto table_id = table_descriptor->tableId;
1468  CHECK_EQ(table_descriptor->persistenceLevel, Data_Namespace::MemoryLevel::DISK_LEVEL);
1469  const auto table_lock =
1470  lockmgr::TableDataLockMgr::getWriteLockForTable({db_id, logicalTableId});
1471  try {
1472  catalog->getDataMgr().checkpoint(db_id, table_id, memoryLevel);
1473  } catch (...) {
1474  dirtyChunks.clear();
1475  throw;
1476  }
1477  updateFragmenterAndCleanupChunks();
1478 }
1479 
1481  // for each dirty fragment
1482  for (auto& cm : chunkMetadata) {
1483  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1484  }
1485  dirtyChunks.clear();
1486  // flush gpu dirty chunks if update was not on gpu
1487  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1488  for (const auto& chunkey : dirtyChunkeys) {
1491  }
1492  }
1493 }
1494 
1496  if (nullptr == catalog) {
1497  return;
1498  }
1499 
1500  // TODO: needed?
1501  ChunkKey chunk_key{catalog->getDatabaseId(), logicalTableId};
1502  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1503  if (is_varlen_update) {
1504  int databaseId = catalog->getDatabaseId();
1505  auto table_epochs = catalog->getTableEpochs(databaseId, logicalTableId);
1506 
1507  dirtyChunks.clear();
1508  catalog->setTableEpochs(databaseId, table_epochs);
1509  } else {
1510  const auto td = catalog->getMetadataForTable(logicalTableId);
1511  CHECK(td);
1512  if (td->persistenceLevel != memoryLevel) {
1513  for (auto dit : dirtyChunks) {
1514  catalog->getDataMgr().free(dit.first->getBuffer());
1515  dit.first->setBuffer(nullptr);
1516  }
1517  }
1518  }
1519 }
std::shared_ptr< Chunk_NS::Chunk > chunk
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:67
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:107
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
bool is_varlen_array() const
Definition: sqltypes.h:498
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:40
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:223
void checkpoint(const int db_id, const int tb_id)
Definition: DataMgr.cpp:489
void cancelUpdate()
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
bool is_fp() const
Definition: sqltypes.h:493
HOST DEVICE int get_scale() const
Definition: sqltypes.h:319
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
tuple r
Definition: test_fsi.py:16
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:99
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:247
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:911
std::vector< bool > is_default
Definition: Fragmenter.h:66
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:103
bool g_enable_auto_metadata_update
void updateFragmenterAndCleanupChunks()
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
#define CHECK_GT(x, y)
Definition: Logger.h:215
std::set< size_t > get_var_len_null_array_indexes(const SQLTypeInfo sql_type_info, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
bool is_time() const
Definition: sqltypes.h:495
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:937
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:54
int64_t get_epoch_seconds_from_days(const int64_t days)
bool commitUpdate()
void stageUpdate()
CONSTEXPR DEVICE bool is_null(const T &value)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:222
bool is_integer() const
Definition: sqltypes.h:491
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:77
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, bool &has_null, T &min, T &max)
bool is_timeinterval() const
Definition: sqltypes.h:500
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
int getDatabaseId() const
Definition: Catalog.h:277
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:4188
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1494
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
bool is_boolean() const
Definition: sqltypes.h:496
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:4202
#define CHECK_LT(x, y)
Definition: Logger.h:213
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:436
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
AbstractBuffer * getBuffer() const
Definition: Chunk.h:105
void update_metadata(SQLTypeInfo const &ti, ChunkUpdateStats &update_stats, int64_t const updated_val, int64_t const old_val, NullSentinelSupplier s=NullSentinelSupplier())
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:938
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
bool g_enable_experimental_string_functions
Data_Namespace::MemoryLevel persistenceLevel
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:316
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:323
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void setTableEpochs(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3069
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
#define CHECK(condition)
Definition: Logger.h:203
char * t
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
mapd_unique_lock< mapd_shared_mutex > write_lock
void setTableEpochsLogExceptions(const int32_t db_id, const std::vector< TableEpochInfo > &table_epochs) const
Definition: Catalog.cpp:3106
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:489
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
void free(AbstractBuffer *buffer)
Definition: DataMgr.cpp:469
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:321
int8_t * numbersPtr
Definition: sqltypes.h:220
unencoded array encoder
void set_minmax(T &min, T &max, T const val)
StringOffsetT get_buffer_offset(bool is_varlen_array, const StringOffsetT *index_array, size_t index)
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:492
size_t get_null_padding(bool is_varlen_array, const std::vector< uint64_t > &frag_offsets, const StringOffsetT *index_array, size_t fragment_row_count)
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::mutex mutex
Definition: UpdelRoll.h:49
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::vector< TableEpochInfo > getTableEpochs(const int32_t db_id, const int32_t table_id) const
Definition: Catalog.cpp:3041
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)