OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <boost/variant.hpp>
18 #include <boost/variant/get.hpp>
19 #include <limits>
20 #include <mutex>
21 #include <string>
22 #include <vector>
23 
24 #include "Catalog/Catalog.h"
25 #include "DataMgr/DataMgr.h"
28 #include "LockMgr/LockMgr.h"
29 #include "Logger/Logger.h"
30 #include "QueryEngine/Execute.h"
32 #include "Shared/DateConverters.h"
34 #include "Shared/thread_count.h"
36 
38 
39 namespace Fragmenter_Namespace {
40 
41 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
42  for (auto& t : threads) {
43  t.get();
44  }
45  threads.clear();
46 }
47 
48 inline bool is_integral(const SQLTypeInfo& t) {
49  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
50 }
51 
53 
55  const TableDescriptor* td,
56  const ColumnDescriptor* cd,
57  const int fragment_id,
58  const std::vector<uint64_t>& frag_offsets,
59  const ScalarTargetValue& rhs_value,
60  const SQLTypeInfo& rhs_type,
61  const Data_Namespace::MemoryLevel memory_level,
62  UpdelRoll& updel_roll) {
63  updateColumn(catalog,
64  td,
65  cd,
66  fragment_id,
67  frag_offsets,
68  std::vector<ScalarTargetValue>(1, rhs_value),
69  rhs_type,
70  memory_level,
71  updel_roll);
72 }
73 
74 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
75  const TableDescriptor* td,
76  const FragmentInfo& fragment,
77  const Data_Namespace::MemoryLevel memory_level,
78  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
79  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
80  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
81  ++nc;
82  if (!cd->isVirtualCol) {
83  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
84  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
85  ChunkKey chunk_key{
86  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
87  auto chunk = Chunk_NS::Chunk::getChunk(cd,
88  &catalog->getDataMgr(),
89  chunk_key,
90  memory_level,
91  0,
92  chunk_meta_it->second->numBytes,
93  chunk_meta_it->second->numElements);
94  chunks.push_back(chunk);
95  }
96  }
97  }
98  return chunks.size();
99 }
100 
102  public:
104 
105  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
106 
107  virtual void addDataBlocksToInsertData(
108  Fragmenter_Namespace::InsertData& insertData) = 0;
109 };
110 
111 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
113  using ColumnDataPtr =
114  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
115 
119  const BUFFER_DATA_TYPE* data_buffer_addr_;
120 
121  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
122  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
123  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
124  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
125  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
126  }
127 
128  ~ScalarChunkConverter() override {}
129 
130  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
131  auto buffer_value = data_buffer_addr_[indexInFragment];
132  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
133  column_data_.get()[row] = insert_value;
134  }
135 
137  DataBlockPtr dataBlock;
138  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
139  insertData.data.push_back(dataBlock);
140  insertData.columnIds.push_back(column_descriptor_->columnId);
141  }
142 };
143 
147 
148  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
151 
152  FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
153  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
154  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
157  }
158 
160 
161  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
162  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
163 
165  src_value_ptr);
166 
167  (*column_data_)[row] = ArrayDatum(
168  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
169  }
170 
172  DataBlockPtr dataBlock;
173  dataBlock.arraysPtr = column_data_.get();
174  insertData.data.push_back(dataBlock);
175  insertData.columnIds.push_back(column_descriptor_->columnId);
176  }
177 };
178 
181 
182  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
183  : FixedLenArrayChunkConverter(num_rows, chunk) {
185  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
186  : nullptr);
187  }
188 
189  ~ArrayChunkConverter() override {}
190 
191  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
192  auto startIndex = index_buffer_addr_[indexInFragment];
193  auto endIndex = index_buffer_addr_[indexInFragment + 1];
194  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
195  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
196  (*column_data_)[row] = ArrayDatum(
197  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
198  }
199 };
200 
204 
205  std::unique_ptr<std::vector<std::string>> column_data_;
206  const int8_t* data_buffer_addr_;
208 
209  StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk* chunk)
210  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
211  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
214  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
215  : nullptr);
216  }
217 
218  ~StringChunkConverter() override {}
219 
220  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
221  size_t src_value_size =
222  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
223  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
224  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
225  }
226 
228  DataBlockPtr dataBlock;
229  dataBlock.stringsPtr = column_data_.get();
230  insertData.data.push_back(dataBlock);
231  insertData.columnIds.push_back(column_descriptor_->columnId);
232  }
233 };
234 
235 template <typename BUFFER_DATA_TYPE>
237  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
238 
242  const BUFFER_DATA_TYPE* data_buffer_addr_;
243 
244  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
245  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
247  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
248  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
249  }
250 
251  ~DateChunkConverter() override {}
252 
253  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
254  auto buffer_value = data_buffer_addr_[indexInFragment];
255  auto insert_value = static_cast<int64_t>(buffer_value);
257  }
258 
260  DataBlockPtr dataBlock;
261  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
262  insertData.data.push_back(dataBlock);
263  insertData.columnIds.push_back(column_descriptor_->columnId);
264  }
265 };
266 
268  const Catalog_Namespace::Catalog* catalog,
269  const TableDescriptor* td,
270  const int fragmentId,
271  const std::vector<TargetMetaInfo> sourceMetaInfo,
272  const std::vector<const ColumnDescriptor*> columnDescriptors,
273  const RowDataProvider& sourceDataProvider,
274  const size_t indexOffFragmentOffsetColumn,
275  const Data_Namespace::MemoryLevel memoryLevel,
276  UpdelRoll& updelRoll,
277  Executor* executor) {
278  updelRoll.is_varlen_update = true;
279  updelRoll.catalog = catalog;
280  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
281  updelRoll.memoryLevel = memoryLevel;
282 
283  size_t num_entries = sourceDataProvider.getEntryCount();
284  size_t num_rows = sourceDataProvider.getRowCount();
285 
286  if (0 == num_rows) {
287  // bail out early
288  return;
289  }
290 
292 
293  auto fragment_ptr = getFragmentInfo(fragmentId);
294  auto& fragment = *fragment_ptr;
295  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
296  get_chunks(catalog, td, fragment, memoryLevel, chunks);
297  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
298  columnDescriptors.size());
299  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
300 
301  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
302  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
303  auto chunk = chunks[indexOfChunk];
304  const auto chunk_cd = chunk->getColumnDesc();
305 
306  if (chunk_cd->isDeletedCol) {
307  deletedChunk = chunk;
308  continue;
309  }
310 
311  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
312  columnDescriptors.end(),
313  [=](const ColumnDescriptor* cd) -> bool {
314  return cd->columnId == chunk_cd->columnId;
315  });
316 
317  if (targetColumnIt != columnDescriptors.end()) {
318  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
319 
320  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
321  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
322 
324  num_rows,
325  *catalog,
326  sourceDataMetaInfo,
327  targetDescriptor,
328  targetDescriptor->columnType,
329  !targetDescriptor->columnType.get_notnull(),
330  sourceDataProvider.getLiteralDictionary(),
332  ? executor->getStringDictionaryProxy(
333  sourceDataMetaInfo.get_type_info().get_comp_param(),
334  executor->getRowSetMemoryOwner(),
335  true)
336  : nullptr};
337  auto converter = factory.create(param);
338  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
339 
340  if (targetDescriptor->columnType.is_geometry()) {
341  // geometry columns are composites
342  // need to skip chunks, depending on geo type
343  switch (targetDescriptor->columnType.get_type()) {
344  case kMULTIPOLYGON:
345  indexOfChunk += 5;
346  break;
347  case kPOLYGON:
348  indexOfChunk += 4;
349  break;
350  case kLINESTRING:
351  indexOfChunk += 2;
352  break;
353  case kPOINT:
354  indexOfChunk += 1;
355  break;
356  default:
357  CHECK(false); // not supported
358  }
359  }
360  } else {
361  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
362  std::unique_ptr<ChunkToInsertDataConverter> converter;
363 
364  if (chunk_cd->columnType.is_fixlen_array()) {
365  converter =
366  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
367  } else if (chunk_cd->columnType.is_string()) {
368  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
369  } else if (chunk_cd->columnType.is_geometry()) {
370  // the logical geo column is a string column
371  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
372  } else {
373  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
374  }
375 
376  chunkConverters.push_back(std::move(converter));
377 
378  } else if (chunk_cd->columnType.is_date_in_days()) {
379  /* Q: Why do we need this?
380  A: In variable length updates path we move the chunk content of column
381  without decoding. Since it again passes through DateDaysEncoder
382  the expected value should be in seconds, but here it will be in days.
383  Therefore, using DateChunkConverter chunk values are being scaled to
384  seconds which then ultimately encoded in days in DateDaysEncoder.
385  */
386  std::unique_ptr<ChunkToInsertDataConverter> converter;
387  const size_t physical_size = chunk_cd->columnType.get_size();
388  if (physical_size == 2) {
389  converter =
390  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
391  } else if (physical_size == 4) {
392  converter =
393  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
394  } else {
395  CHECK(false);
396  }
397  chunkConverters.push_back(std::move(converter));
398  } else {
399  std::unique_ptr<ChunkToInsertDataConverter> converter;
400  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
401  int logical_size = logical_type.get_size();
402  int physical_size = chunk_cd->columnType.get_size();
403 
404  if (logical_type.is_string()) {
405  // for dicts -> logical = physical
406  logical_size = physical_size;
407  }
408 
409  if (8 == physical_size) {
410  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
411  num_rows, chunk.get());
412  } else if (4 == physical_size) {
413  if (8 == logical_size) {
414  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
415  num_rows, chunk.get());
416  } else {
417  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
418  num_rows, chunk.get());
419  }
420  } else if (2 == chunk_cd->columnType.get_size()) {
421  if (8 == logical_size) {
422  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
423  num_rows, chunk.get());
424  } else if (4 == logical_size) {
425  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
426  num_rows, chunk.get());
427  } else {
428  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
429  num_rows, chunk.get());
430  }
431  } else if (1 == chunk_cd->columnType.get_size()) {
432  if (8 == logical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
434  num_rows, chunk.get());
435  } else if (4 == logical_size) {
436  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
437  num_rows, chunk.get());
438  } else if (2 == logical_size) {
439  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
440  num_rows, chunk.get());
441  } else {
442  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
443  num_rows, chunk.get());
444  }
445  } else {
446  CHECK(false); // unknown
447  }
448 
449  chunkConverters.push_back(std::move(converter));
450  }
451  }
452  }
453 
454  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
455  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
456 
457  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
458  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
459  deletedChunk->getColumnDesc()->tableId,
460  deletedChunk->getColumnDesc()->columnId,
461  fragment.fragmentId};
462  updelRoll.dirtyChunkeys.insert(chunkey);
463  bool* deletedChunkBuffer =
464  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
465 
466  std::atomic<size_t> row_idx{0};
467 
468  auto row_converter = [&sourceDataProvider,
469  &sourceDataConverters,
470  &indexOffFragmentOffsetColumn,
471  &chunkConverters,
472  &deletedChunkBuffer,
473  &row_idx](size_t indexOfEntry) -> void {
474  // convert the source data
475  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
476  if (row.empty()) {
477  return;
478  }
479 
480  size_t indexOfRow = row_idx.fetch_add(1);
481 
482  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
483  if (sourceDataConverters[col]) {
484  const auto& mapd_variant = row[col];
485  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
486  }
487  }
488 
489  auto scalar = checked_get(
490  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
491  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
492 
493  // convert the remaining chunks
494  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
495  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
496  }
497 
498  // now mark the row as deleted
499  deletedChunkBuffer[indexInChunkBuffer] = true;
500  };
501 
502  bool can_go_parallel = num_rows > 20000;
503 
504  if (can_go_parallel) {
505  const size_t num_worker_threads = cpu_threads();
506  std::vector<std::future<void>> worker_threads;
507  for (size_t i = 0,
508  start_entry = 0,
509  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
510  i < num_worker_threads && start_entry < num_entries;
511  ++i, start_entry += stride) {
512  const auto end_entry = std::min(start_entry + stride, num_rows);
513  worker_threads.push_back(std::async(
514  std::launch::async,
515  [&row_converter](const size_t start, const size_t end) {
516  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
517  row_converter(indexOfRow);
518  }
519  },
520  start_entry,
521  end_entry));
522  }
523 
524  for (auto& child : worker_threads) {
525  child.wait();
526  }
527 
528  } else {
529  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
530  row_converter(entryIdx);
531  }
532  }
533 
535  insert_data.databaseId = catalog->getCurrentDB().dbId;
536  insert_data.tableId = td->tableId;
537 
538  for (size_t i = 0; i < chunkConverters.size(); i++) {
539  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
540  continue;
541  }
542 
543  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
544  if (sourceDataConverters[i]) {
545  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
546  }
547  continue;
548  }
549 
550  insert_data.numRows = num_rows;
551  insertDataNoCheckpoint(insert_data);
552 
553  // update metdata
554  if (!deletedChunk->getBuffer()->hasEncoder()) {
555  deletedChunk->initEncoder();
556  }
557  deletedChunk->getBuffer()->getEncoder()->updateStats(static_cast<int64_t>(true), false);
558 
559  auto& shadowDeletedChunkMeta =
560  fragment.shadowChunkMetadataMap[deletedChunk->getColumnDesc()->columnId];
561  if (shadowDeletedChunkMeta->numElements >
562  deletedChunk->getBuffer()->getEncoder()->getNumElems()) {
563  // the append will have populated shadow meta data, otherwise use existing num
564  // elements
565  deletedChunk->getBuffer()->getEncoder()->setNumElems(
566  shadowDeletedChunkMeta->numElements);
567  }
568  deletedChunk->getBuffer()->setUpdated();
569 }
570 
572  const TableDescriptor* td,
573  const ColumnDescriptor* cd,
574  const int fragment_id,
575  const std::vector<uint64_t>& frag_offsets,
576  const std::vector<ScalarTargetValue>& rhs_values,
577  const SQLTypeInfo& rhs_type,
578  const Data_Namespace::MemoryLevel memory_level,
579  UpdelRoll& updel_roll) {
580  updel_roll.catalog = catalog;
581  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
582  updel_roll.memoryLevel = memory_level;
583 
584  const size_t ncore = cpu_threads();
585  const auto nrow = frag_offsets.size();
586  const auto n_rhs_values = rhs_values.size();
587  if (0 == nrow) {
588  return;
589  }
590  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
591 
592  auto fragment_ptr = getFragmentInfo(fragment_id);
593  auto& fragment = *fragment_ptr;
594  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
595  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
596  ChunkKey chunk_key{
597  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
598  auto chunk = Chunk_NS::Chunk::getChunk(cd,
599  &catalog->getDataMgr(),
600  chunk_key,
602  0,
603  chunk_meta_it->second->numBytes,
604  chunk_meta_it->second->numElements);
605 
606  std::vector<int8_t> has_null_per_thread(ncore, 0);
607  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
608  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
609  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
610  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
611 
612  // parallel update elements
613  std::vector<std::future<void>> threads;
614 
615  const auto segsz = (nrow + ncore - 1) / ncore;
616  auto dbuf = chunk->getBuffer();
617  auto dbuf_addr = dbuf->getMemoryPtr();
618  dbuf->setUpdated();
619  {
620  std::lock_guard<std::mutex> lck(updel_roll.mutex);
621  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
622  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
623  }
624 
625  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
626  cd->tableId,
627  cd->columnId,
628  fragment.fragmentId};
629  updel_roll.dirtyChunkeys.insert(chunkey);
630  }
631  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
632  threads.emplace_back(std::async(
633  std::launch::async,
634  [=,
635  &has_null_per_thread,
636  &min_int64t_per_thread,
637  &max_int64t_per_thread,
638  &min_double_per_thread,
639  &max_double_per_thread,
640  &frag_offsets,
641  &rhs_values] {
642  SQLTypeInfo lhs_type = cd->columnType;
643 
644  // !! not sure if this is a undocumented convention or a bug, but for a sharded
645  // table the dictionary id of a encoded string column is not specified by
646  // comp_param in physical table but somehow in logical table :) comp_param in
647  // physical table is always 0, so need to adapt accordingly...
648  auto cdl = (shard_ < 0)
649  ? cd
650  : catalog->getMetadataForColumn(
651  catalog->getLogicalTableId(td->tableId), cd->columnId);
652  CHECK(cdl);
653  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
654  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
655  lhs_type, &decimalOverflowValidator);
656  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
657  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
658  lhs_type, &dateDaysOverflowValidator);
659 
660  StringDictionary* stringDict{nullptr};
661  if (lhs_type.is_string()) {
662  CHECK(kENCODING_DICT == lhs_type.get_compression());
663  auto dictDesc = const_cast<DictDescriptor*>(
664  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
665  CHECK(dictDesc);
666  stringDict = dictDesc->stringDict.get();
667  CHECK(stringDict);
668  }
669 
670  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
671  const auto roffs = frag_offsets[r];
672  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
673  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
674  ScalarTargetValue sv2;
675 
676  // Subtle here is on the two cases of string-to-string assignments, when
677  // upstream passes RHS string as a string index instead of a preferred "real
678  // string".
679  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
680  // index
681  // in this layer, so if upstream passes a str idx here, an
682  // exception is thrown.
683  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
684  // str idx.
685  if (rhs_type.is_string()) {
686  if (const auto vp = boost::get<int64_t>(sv)) {
687  auto dictDesc = const_cast<DictDescriptor*>(
688  catalog->getMetadataForDict(rhs_type.get_comp_param()));
689  if (nullptr == dictDesc) {
690  throw std::runtime_error(
691  "UPDATE does not support cast from string literal to string "
692  "column.");
693  }
694  auto stringDict = dictDesc->stringDict.get();
695  CHECK(stringDict);
696  sv2 = NullableString(stringDict->getString(*vp));
697  sv = &sv2;
698  }
699  }
700 
701  if (const auto vp = boost::get<int64_t>(sv)) {
702  auto v = *vp;
703  if (lhs_type.is_string()) {
704  throw std::runtime_error("UPDATE does not support cast to string.");
705  }
706  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
707  if (lhs_type.is_decimal()) {
708  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
709  int64_t decimal_val;
710  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
711  tabulate_metadata(lhs_type,
712  min_int64t_per_thread[c],
713  max_int64t_per_thread[c],
714  has_null_per_thread[c],
715  (v == inline_int_null_value<int64_t>() &&
716  lhs_type.get_notnull() == false)
717  ? v
718  : decimal_val);
719  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
720  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
721  if (positive_v_and_negative_d || negative_v_and_positive_d) {
722  throw std::runtime_error(
723  "Data conversion overflow on " + std::to_string(v) +
724  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
725  std::to_string(rhs_type.get_scale()) + ") to (" +
726  std::to_string(lhs_type.get_dimension()) + ", " +
727  std::to_string(lhs_type.get_scale()) + ")");
728  }
729  } else if (is_integral(lhs_type)) {
730  if (lhs_type.is_date_in_days()) {
731  // Store meta values in seconds
732  if (lhs_type.get_size() == 2) {
733  nullAwareDateOverflowValidator.validate<int16_t>(v);
734  } else {
735  nullAwareDateOverflowValidator.validate<int32_t>(v);
736  }
737  int64_t days;
738  get_scalar<int64_t>(data_ptr, lhs_type, days);
739  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
740  tabulate_metadata(lhs_type,
741  min_int64t_per_thread[c],
742  max_int64t_per_thread[c],
743  has_null_per_thread[c],
744  (v == inline_int_null_value<int64_t>() &&
745  lhs_type.get_notnull() == false)
746  ? NullSentinelSupplier()(lhs_type, v)
747  : seconds);
748  } else {
749  int64_t target_value;
750  if (rhs_type.is_decimal()) {
751  target_value = round(decimal_to_double(rhs_type, v));
752  } else {
753  target_value = v;
754  }
755  tabulate_metadata(lhs_type,
756  min_int64t_per_thread[c],
757  max_int64t_per_thread[c],
758  has_null_per_thread[c],
759  target_value);
760  }
761  } else {
763  lhs_type,
764  min_double_per_thread[c],
765  max_double_per_thread[c],
766  has_null_per_thread[c],
767  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
768  }
769  } else if (const auto vp = boost::get<double>(sv)) {
770  auto v = *vp;
771  if (lhs_type.is_string()) {
772  throw std::runtime_error("UPDATE does not support cast to string.");
773  }
774  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
775  if (lhs_type.is_integer()) {
776  tabulate_metadata(lhs_type,
777  min_int64t_per_thread[c],
778  max_int64t_per_thread[c],
779  has_null_per_thread[c],
780  int64_t(v));
781  } else if (lhs_type.is_fp()) {
782  tabulate_metadata(lhs_type,
783  min_double_per_thread[c],
784  max_double_per_thread[c],
785  has_null_per_thread[c],
786  double(v));
787  } else {
788  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
789  "LHS with a floating RHS.";
790  }
791  } else if (const auto vp = boost::get<float>(sv)) {
792  auto v = *vp;
793  if (lhs_type.is_string()) {
794  throw std::runtime_error("UPDATE does not support cast to string.");
795  }
796  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
797  if (lhs_type.is_integer()) {
798  tabulate_metadata(lhs_type,
799  min_int64t_per_thread[c],
800  max_int64t_per_thread[c],
801  has_null_per_thread[c],
802  int64_t(v));
803  } else {
804  tabulate_metadata(lhs_type,
805  min_double_per_thread[c],
806  max_double_per_thread[c],
807  has_null_per_thread[c],
808  double(v));
809  }
810  } else if (const auto vp = boost::get<NullableString>(sv)) {
811  const auto s = boost::get<std::string>(vp);
812  const auto sval = s ? *s : std::string("");
813  if (lhs_type.is_string()) {
814  decltype(stringDict->getOrAdd(sval)) sidx;
815  {
816  std::unique_lock<std::mutex> lock(temp_mutex_);
817  sidx = stringDict->getOrAdd(sval);
818  }
819  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
820  tabulate_metadata(lhs_type,
821  min_int64t_per_thread[c],
822  max_int64t_per_thread[c],
823  has_null_per_thread[c],
824  int64_t(sidx));
825  } else if (sval.size() > 0) {
826  auto dval = std::atof(sval.data());
827  if (lhs_type.is_boolean()) {
828  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
829  } else if (lhs_type.is_time()) {
830  throw std::runtime_error(
831  "Date/Time/Timestamp update not supported through translated "
832  "string path.");
833  }
834  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
835  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
836  tabulate_metadata(lhs_type,
837  min_double_per_thread[c],
838  max_double_per_thread[c],
839  has_null_per_thread[c],
840  double(dval));
841  } else {
842  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
843  tabulate_metadata(lhs_type,
844  min_int64t_per_thread[c],
845  max_int64t_per_thread[c],
846  has_null_per_thread[c],
847  int64_t(dval));
848  }
849  } else {
850  put_null(data_ptr, lhs_type, cd->columnName);
851  has_null_per_thread[c] = true;
852  }
853  } else {
854  CHECK(false);
855  }
856  }
857  }));
858  if (threads.size() >= (size_t)cpu_threads()) {
859  wait_cleanup_threads(threads);
860  }
861  }
862  wait_cleanup_threads(threads);
863 
864  // for unit test
866  if (cd->isDeletedCol) {
867  const auto deleted_offsets = getVacuumOffsets(chunk);
868  if (deleted_offsets.size() > 0) {
869  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
870  return;
871  }
872  }
873  }
874  bool has_null_per_chunk{false};
875  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
876  double min_double_per_chunk{std::numeric_limits<double>::max()};
877  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
878  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
879  for (size_t c = 0; c < ncore; ++c) {
880  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
881  max_double_per_chunk =
882  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
883  min_double_per_chunk =
884  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
885  max_int64t_per_chunk =
886  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
887  min_int64t_per_chunk =
888  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
889  }
891  fragment,
892  chunk,
893  has_null_per_chunk,
894  max_double_per_chunk,
895  min_double_per_chunk,
896  max_int64t_per_chunk,
897  min_int64t_per_chunk,
898  cd->columnType,
899  updel_roll);
900 }
901 
903  FragmentInfo& fragment,
904  std::shared_ptr<Chunk_NS::Chunk> chunk,
905  const bool has_null_per_chunk,
906  const double max_double_per_chunk,
907  const double min_double_per_chunk,
908  const int64_t max_int64t_per_chunk,
909  const int64_t min_int64t_per_chunk,
910  const SQLTypeInfo& rhs_type,
911  UpdelRoll& updel_roll) {
912  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
913  auto key = std::make_pair(td, &fragment);
914  std::lock_guard<std::mutex> lck(updel_roll.mutex);
915  if (0 == updel_roll.chunkMetadata.count(key)) {
916  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
917  }
918  if (0 == updel_roll.numTuples.count(key)) {
919  updel_roll.numTuples[key] = fragment.shadowNumTuples;
920  }
921  auto& chunkMetadata = updel_roll.chunkMetadata[key];
922 
923  auto buffer = chunk->getBuffer();
924  const auto& lhs_type = cd->columnType;
925 
926  auto encoder = buffer->getEncoder();
927  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
928  static_assert(std::is_same<decltype(min), decltype(max)>::value,
929  "Type mismatch on min/max");
930  if (has_null) {
931  encoder->updateStats(decltype(min)(), true);
932  }
933  if (max < min) {
934  return;
935  }
936  encoder->updateStats(min, false);
937  encoder->updateStats(max, false);
938  };
939 
940  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
941  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
942  } else if (lhs_type.is_fp()) {
943  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
944  } else if (lhs_type.is_decimal()) {
945  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
946  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
947  has_null_per_chunk);
948  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
949  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
950  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
951  }
952  buffer->getEncoder()->getMetadata(chunkMetadata[cd->columnId]);
953 }
954 
956  const MetaDataKey& key,
957  UpdelRoll& updel_roll) {
958  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
959  if (updel_roll.chunkMetadata.count(key)) {
960  auto& fragmentInfo = *key.second;
961  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
962  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
963  fragmentInfo.setChunkMetadataMap(chunkMetadata);
964  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
965  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
966  }
967 }
968 
970  const TableDescriptor* td,
971  const FragmentInfo& fragment,
972  const Data_Namespace::MemoryLevel memory_level) {
973  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
974  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
975  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
976  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
977  ++ncol;
978  if (!cd->isVirtualCol) {
979  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
980  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
981  ChunkKey chunk_key{
982  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
983  auto chunk = Chunk_NS::Chunk::getChunk(cd,
984  &catalog_->getDataMgr(),
985  chunk_key,
986  memory_level,
987  0,
988  chunk_meta_it->second->numBytes,
989  chunk_meta_it->second->numElements);
990  chunks.push_back(chunk);
991  }
992  }
993  }
994  return chunks;
995 }
996 
997 // get a sorted vector of offsets of rows to vacuum
998 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
999  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1000  const auto data_buffer = chunk->getBuffer();
1001  const auto data_addr = data_buffer->getMemoryPtr();
1002  const size_t nrows_in_chunk = data_buffer->size();
1003  const size_t ncore = cpu_threads();
1004  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1005  std::vector<std::vector<uint64_t>> deleted_offsets;
1006  deleted_offsets.resize(ncore);
1007  std::vector<std::future<void>> threads;
1008  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1009  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1010  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1011  const auto ithread = rbegin / segsz;
1012  CHECK(ithread < deleted_offsets.size());
1013  deleted_offsets[ithread].reserve(segsz);
1014  for (size_t r = rbegin; r < rend; ++r) {
1015  if (data_addr[r]) {
1016  deleted_offsets[ithread].push_back(r);
1017  }
1018  }
1019  }));
1020  }
1021  wait_cleanup_threads(threads);
1022  std::vector<uint64_t> all_deleted_offsets;
1023  for (size_t i = 0; i < ncore; ++i) {
1024  all_deleted_offsets.insert(
1025  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1026  }
1027  return all_deleted_offsets;
1028 }
1029 
1030 template <typename T>
1031 static void set_chunk_stats(const SQLTypeInfo& col_type,
1032  int8_t* data_addr,
1033  int8_t& has_null,
1034  T& min,
1035  T& max) {
1036  T v;
1037  const auto can_be_null = !col_type.get_notnull();
1038  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1039  if (is_null) {
1040  has_null = has_null || (can_be_null && is_null);
1041  } else {
1042  set_minmax(min, max, v);
1043  }
1044 }
1045 
1047  FragmentInfo& fragment,
1048  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1049  const size_t nrows_to_keep,
1050  UpdelRoll& updel_roll) {
1051  auto cd = chunk->getColumnDesc();
1052  auto td = catalog->getMetadataForTable(cd->tableId);
1053  auto data_buffer = chunk->getBuffer();
1054  std::lock_guard<std::mutex> lck(updel_roll.mutex);
1055  const auto key = std::make_pair(td, &fragment);
1056  if (0 == updel_roll.chunkMetadata.count(key)) {
1057  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
1058  }
1059  auto& chunkMetadata = updel_roll.chunkMetadata[key];
1060  chunkMetadata[cd->columnId]->numElements = nrows_to_keep;
1061  chunkMetadata[cd->columnId]->numBytes = data_buffer->size();
1062  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
1063  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
1064  }
1065 }
1066 
1068  const FragmentInfo& fragment,
1069  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1070  const std::vector<uint64_t>& frag_offsets) {
1071  const auto cd = chunk->getColumnDesc();
1072  const auto& col_type = cd->columnType;
1073  auto data_buffer = chunk->getBuffer();
1074  auto data_addr = data_buffer->getMemoryPtr();
1075  auto element_size =
1076  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1077  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1078  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1079  size_t nbytes_fix_data_to_keep = 0;
1080  auto nrows_to_vacuum = frag_offsets.size();
1081  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1082  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1083  auto is_last_one = irow == nrows_to_vacuum;
1084  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1085  auto maddr_to_vacuum = data_addr;
1086  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1087  if (nrows_to_keep > 0) {
1088  auto nbytes_to_keep = nrows_to_keep * element_size;
1089  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1090  // move curr fixlen row block toward front
1091  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1092  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1093  nbytes_to_keep);
1094  }
1095  irow_of_blk_to_fill += nrows_to_keep;
1096  nbytes_fix_data_to_keep += nbytes_to_keep;
1097  }
1098  irow_of_blk_to_keep = irow_to_vacuum + 1;
1099  }
1100  return nbytes_fix_data_to_keep;
1101 }
1102 
1104  const FragmentInfo& fragment,
1105  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1106  const std::vector<uint64_t>& frag_offsets) {
1107  auto data_buffer = chunk->getBuffer();
1108  auto index_buffer = chunk->getIndexBuf();
1109  auto data_addr = data_buffer->getMemoryPtr();
1110  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1111  auto index_array = (StringOffsetT*)indices_addr;
1112  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1113  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1114  size_t nbytes_fix_data_to_keep = 0;
1115  size_t nbytes_var_data_to_keep = 0;
1116  auto nrows_to_vacuum = frag_offsets.size();
1117  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1118  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1119  auto is_last_one = irow == nrows_to_vacuum;
1120  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1121  auto maddr_to_vacuum = data_addr;
1122  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1123  if (nrows_to_keep > 0) {
1124  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1125  auto nbytes_to_keep =
1126  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1127  index_array[irow_of_blk_to_keep];
1128  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1129  // move curr varlen row block toward front
1130  memmove(data_addr + ibyte_var_data_to_keep,
1131  data_addr + index_array[irow_of_blk_to_keep],
1132  nbytes_to_keep);
1133 
1134  const auto index_base = index_array[irow_of_blk_to_keep];
1135  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1136  auto& index = index_array[irow_of_blk_to_keep + i];
1137  index = ibyte_var_data_to_keep + (index - index_base);
1138  }
1139  }
1140  nbytes_var_data_to_keep += nbytes_to_keep;
1141  maddr_to_vacuum = indices_addr;
1142 
1143  constexpr static auto index_element_size = sizeof(StringOffsetT);
1144  nbytes_to_keep = nrows_to_keep * index_element_size;
1145  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1146  // move curr fixlen row block toward front
1147  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1148  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1149  nbytes_to_keep);
1150  }
1151  irow_of_blk_to_fill += nrows_to_keep;
1152  nbytes_fix_data_to_keep += nbytes_to_keep;
1153  }
1154  irow_of_blk_to_keep = irow_to_vacuum + 1;
1155  }
1156  return nbytes_var_data_to_keep;
1157 }
1158 
1160  const TableDescriptor* td,
1161  const int fragment_id,
1162  const std::vector<uint64_t>& frag_offsets,
1163  const Data_Namespace::MemoryLevel memory_level,
1164  UpdelRoll& updel_roll) {
1165  auto fragment_ptr = getFragmentInfo(fragment_id);
1166  auto& fragment = *fragment_ptr;
1167  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1168  const auto ncol = chunks.size();
1169 
1170  std::vector<int8_t> has_null_per_thread(ncol, 0);
1171  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1172  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1173  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1174  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1175 
1176  // parallel delete columns
1177  std::vector<std::future<void>> threads;
1178  auto nrows_to_vacuum = frag_offsets.size();
1179  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1180  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1181 
1182  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1183  auto chunk = chunks[ci];
1184  const auto cd = chunk->getColumnDesc();
1185  const auto& col_type = cd->columnType;
1186  auto data_buffer = chunk->getBuffer();
1187  auto index_buffer = chunk->getIndexBuf();
1188  auto data_addr = data_buffer->getMemoryPtr();
1189  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1190  auto index_array = (StringOffsetT*)indices_addr;
1191  bool is_varlen = col_type.is_varlen_indeed();
1192 
1193  auto fixlen_vacuum = [=,
1194  &has_null_per_thread,
1195  &max_double_per_thread,
1196  &min_double_per_thread,
1197  &min_int64t_per_thread,
1198  &max_int64t_per_thread,
1199  &updel_roll,
1200  &frag_offsets,
1201  &fragment] {
1202  size_t nbytes_fix_data_to_keep;
1203  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1204 
1205  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1206  data_buffer->setSize(nbytes_fix_data_to_keep);
1207  data_buffer->setUpdated();
1208 
1209  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1210 
1211  auto daddr = data_addr;
1212  auto element_size =
1213  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1214  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1215  if (col_type.is_fixlen_array()) {
1216  auto encoder =
1217  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->getEncoder());
1218  CHECK(encoder);
1219  encoder->updateMetadata((int8_t*)daddr);
1220  } else if (col_type.is_fp()) {
1221  set_chunk_stats(col_type,
1222  data_addr,
1223  has_null_per_thread[ci],
1224  min_double_per_thread[ci],
1225  max_double_per_thread[ci]);
1226  } else {
1227  set_chunk_stats(col_type,
1228  data_addr,
1229  has_null_per_thread[ci],
1230  min_int64t_per_thread[ci],
1231  max_int64t_per_thread[ci]);
1232  }
1233  }
1234  };
1235 
1236  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1237  size_t nbytes_var_data_to_keep;
1238  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1239 
1240  data_buffer->getEncoder()->setNumElems(nrows_to_keep);
1241  data_buffer->setSize(nbytes_var_data_to_keep);
1242  data_buffer->setUpdated();
1243 
1244  index_array[nrows_to_keep] = data_buffer->size();
1245  index_buffer->setSize(sizeof(*index_array) *
1246  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1247  index_buffer->setUpdated();
1248 
1249  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1250  };
1251 
1252  if (is_varlen) {
1253  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1254  } else {
1255  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1256  }
1257  if (threads.size() >= (size_t)cpu_threads()) {
1258  wait_cleanup_threads(threads);
1259  }
1260  }
1261 
1262  wait_cleanup_threads(threads);
1263 
1264  auto key = std::make_pair(td, &fragment);
1265  updel_roll.numTuples[key] = nrows_to_keep;
1266  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1267  auto chunk = chunks[ci];
1268  auto cd = chunk->getColumnDesc();
1269  if (!cd->columnType.is_fixlen_array()) {
1271  fragment,
1272  chunk,
1273  has_null_per_thread[ci],
1274  max_double_per_thread[ci],
1275  min_double_per_thread[ci],
1276  max_int64t_per_thread[ci],
1277  min_int64t_per_thread[ci],
1278  cd->columnType,
1279  updel_roll);
1280  }
1281  }
1282 }
1283 
1284 } // namespace Fragmenter_Namespace
1285 
1287  if (nullptr == catalog) {
1288  return;
1289  }
1290  const auto td = catalog->getMetadataForTable(logicalTableId);
1291  CHECK(td);
1292  ChunkKey chunk_key{catalog->getDatabaseId(), td->tableId};
1293  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1294 
1295  // Checkpoint all shards. Otherwise, epochs can go out of sync. Also, for distributed
1296  // mode, do not checkpoint here, since the aggregator handles checkpointing.
1297  if (!g_cluster && td->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
1298  auto table_epochs = catalog->getTableEpochs(catalog->getDatabaseId(), logicalTableId);
1299  try {
1300  // `checkpointWithAutoRollback` is not called here because, if a failure occurs,
1301  // `dirtyChunks` has to be cleared before resetting epochs
1302  catalog->checkpoint(logicalTableId);
1303  } catch (...) {
1304  dirtyChunks.clear();
1305  const_cast<Catalog_Namespace::Catalog*>(catalog)->setTableEpochsLogExceptions(
1306  catalog->getDatabaseId(), table_epochs);
1307  throw;
1308  }
1309  }
1310 
1311  // for each dirty fragment
1312  for (auto& cm : chunkMetadata) {
1313  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1314  }
1315  dirtyChunks.clear();
1316  // flush gpu dirty chunks if update was not on gpu
1317  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1318  for (const auto& chunkey : dirtyChunkeys) {
1321  }
1322  }
1323 }
1324 
1326  if (nullptr == catalog) {
1327  return;
1328  }
1329 
1330  // TODO: needed?
1331  ChunkKey chunk_key{catalog->getDatabaseId(), logicalTableId};
1332  const auto table_lock = lockmgr::TableDataLockMgr::getWriteLockForTable(chunk_key);
1333  if (is_varlen_update) {
1334  int databaseId = catalog->getDatabaseId();
1335  auto table_epochs = catalog->getTableEpochs(databaseId, logicalTableId);
1336 
1337  dirtyChunks.clear();
1338  const_cast<Catalog_Namespace::Catalog*>(catalog)->setTableEpochs(databaseId,
1339  table_epochs);
1340  } else {
1341  const auto td = catalog->getMetadataForTable(logicalTableId);
1342  CHECK(td);
1343  if (td->persistenceLevel != memoryLevel) {
1344  for (auto dit : dirtyChunks) {
1345  catalog->getDataMgr().free(dit.first->getBuffer());
1346  dit.first->setBuffer(nullptr);
1347  }
1348  }
1349  }
1350 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:67
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:106
std::vector< int > ChunkKey
Definition: types.h:37
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
HOST DEVICE int get_size() const
Definition: sqltypes.h:340
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
void commitUpdate()
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:40
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:211
void cancelUpdate()
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:155
bool is_fp() const
Definition: sqltypes.h:491
HOST DEVICE int get_scale() const
Definition: sqltypes.h:335
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:100
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:241
void updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:899
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:104
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
bool is_time() const
Definition: sqltypes.h:493
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:947
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:53
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:210
bool is_integer() const
Definition: sqltypes.h:489
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
bool is_timeinterval() const
Definition: sqltypes.h:498
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3745
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1460
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
bool is_boolean() const
Definition: sqltypes.h:494
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:436
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:338
bool is_null(const T &v, const SQLTypeInfo &t)
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
AbstractBuffer * getBuffer() const
Definition: Chunk.h:104
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
int32_t ArrayOffsetT
Definition: sqltypes.h:948
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
bool g_enable_experimental_string_functions
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:332
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:339
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
#define CHECK(condition)
Definition: Logger.h:197
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
bool g_cluster
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:487
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
specifies the content in-memory of a row in the table metadata table
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:337
int8_t * numbersPtr
Definition: sqltypes.h:220
void set_minmax(T &min, T &max, T const val)
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:490
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)