OmniSciDB  5ade3759e0
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <boost/variant.hpp>
18 #include <boost/variant/get.hpp>
19 #include <limits>
20 #include <mutex>
21 #include <string>
22 #include <vector>
23 
24 #include "Catalog/Catalog.h"
25 #include "DataMgr/DataMgr.h"
29 #include "Shared/ConfigResolve.h"
30 #include "Shared/DateConverters.h"
31 #include "Shared/Logger.h"
33 #include "Shared/thread_count.h"
35 
36 namespace Fragmenter_Namespace {
37 
38 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
39  for (auto& t : threads) {
40  t.get();
41  }
42  threads.clear();
43 }
44 
46  auto fragment_it = std::find_if(
47  fragmentInfoVec_.begin(), fragmentInfoVec_.end(), [=](const auto& f) -> bool {
48  return f.fragmentId == fragment_id;
49  });
50  CHECK(fragment_it != fragmentInfoVec_.end());
51  return *fragment_it;
52 }
53 
54 inline bool is_integral(const SQLTypeInfo& t) {
55  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
56 }
57 
59 
61  const std::string& tab_name,
62  const std::string& col_name,
63  const int fragment_id,
64  const std::vector<uint64_t>& frag_offsets,
65  const std::vector<ScalarTargetValue>& rhs_values,
66  const SQLTypeInfo& rhs_type,
67  const Data_Namespace::MemoryLevel memory_level,
68  UpdelRoll& updel_roll) {
69  const auto td = catalog->getMetadataForTable(tab_name);
70  CHECK(td);
71  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
72  CHECK(cd);
73  td->fragmenter->updateColumn(catalog,
74  td,
75  cd,
76  fragment_id,
77  frag_offsets,
78  rhs_values,
79  rhs_type,
80  memory_level,
81  updel_roll);
82 }
83 
85  const TableDescriptor* td,
86  const ColumnDescriptor* cd,
87  const int fragment_id,
88  const std::vector<uint64_t>& frag_offsets,
89  const ScalarTargetValue& rhs_value,
90  const SQLTypeInfo& rhs_type,
91  const Data_Namespace::MemoryLevel memory_level,
92  UpdelRoll& updel_roll) {
93  updateColumn(catalog,
94  td,
95  cd,
96  fragment_id,
97  frag_offsets,
98  std::vector<ScalarTargetValue>(1, rhs_value),
99  rhs_type,
100  memory_level,
101  updel_roll);
102 }
103 
104 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
105  const TableDescriptor* td,
106  const FragmentInfo& fragment,
107  const Data_Namespace::MemoryLevel memory_level,
108  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
109  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
110  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
111  ++nc;
112  if (!cd->isVirtualCol) {
113  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
114  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
115  ChunkKey chunk_key{
116  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
117  auto chunk = Chunk_NS::Chunk::getChunk(cd,
118  &catalog->getDataMgr(),
119  chunk_key,
120  memory_level,
121  0,
122  chunk_meta_it->second.numBytes,
123  chunk_meta_it->second.numElements);
124  chunks.push_back(chunk);
125  }
126  }
127  }
128  return chunks.size();
129 }
130 
132  public:
134 
135  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
136 
137  virtual void addDataBlocksToInsertData(
139 };
140 
141 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
143  using ColumnDataPtr =
144  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
145 
149  const BUFFER_DATA_TYPE* data_buffer_addr_;
150 
151  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
152  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
153  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
154  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
155  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->get_buffer()->getMemoryPtr();
156  }
157 
158  ~ScalarChunkConverter() override {}
159 
160  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
161  auto buffer_value = data_buffer_addr_[indexInFragment];
162  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
163  column_data_.get()[row] = insert_value;
164  }
165 
167  DataBlockPtr dataBlock;
168  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
169  insertData.data.push_back(dataBlock);
170  insertData.columnIds.push_back(column_descriptor_->columnId);
171  }
172 };
173 
177 
178  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
181 
183  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
184  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
185  data_buffer_addr_ = chunk->get_buffer()->getMemoryPtr();
186  fixed_array_length_ = chunk->get_column_desc()->columnType.get_size();
187  }
188 
190 
191  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
192  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
193  (*column_data_)[row] =
194  ArrayDatum(fixed_array_length_, (int8_t*)src_value_ptr, DoNothingDeleter());
195  }
196 
198  DataBlockPtr dataBlock;
199  dataBlock.arraysPtr = column_data_.get();
200  insertData.data.push_back(dataBlock);
201  insertData.columnIds.push_back(column_descriptor_->columnId);
202  }
203 };
204 
207 
208  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
209  : FixedLenArrayChunkConverter(num_rows, chunk) {
210  index_buffer_addr_ =
211  (StringOffsetT*)(chunk->get_index_buf() ? chunk->get_index_buf()->getMemoryPtr()
212  : nullptr);
213  }
214 
215  ~ArrayChunkConverter() override {}
216 
217  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
218  size_t src_value_size =
219  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
220  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
221  (*column_data_)[row] =
222  ArrayDatum(src_value_size, (int8_t*)src_value_ptr, DoNothingDeleter());
223  }
224 };
225 
229 
230  std::unique_ptr<std::vector<std::string>> column_data_;
231  const int8_t* data_buffer_addr_;
233 
235  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
236  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
237  data_buffer_addr_ = chunk->get_buffer()->getMemoryPtr();
238  index_buffer_addr_ =
239  (StringOffsetT*)(chunk->get_index_buf() ? chunk->get_index_buf()->getMemoryPtr()
240  : nullptr);
241  }
242 
243  ~StringChunkConverter() override {}
244 
245  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
246  size_t src_value_size =
247  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
248  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
249  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
250  }
251 
253  DataBlockPtr dataBlock;
254  dataBlock.stringsPtr = column_data_.get();
255  insertData.data.push_back(dataBlock);
256  insertData.columnIds.push_back(column_descriptor_->columnId);
257  }
258 };
259 
260 template <typename BUFFER_DATA_TYPE>
262  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
263 
267  const BUFFER_DATA_TYPE* data_buffer_addr_;
268 
269  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
270  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
271  column_data_ = ColumnDataPtr(
272  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
273  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->get_buffer()->getMemoryPtr();
274  }
275 
276  ~DateChunkConverter() override {}
277 
278  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
279  auto buffer_value = data_buffer_addr_[indexInFragment];
280  auto insert_value = static_cast<int64_t>(buffer_value);
281  column_data_.get()[row] = DateConverters::get_epoch_seconds_from_days(insert_value);
282  }
283 
285  DataBlockPtr dataBlock;
286  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
287  insertData.data.push_back(dataBlock);
288  insertData.columnIds.push_back(column_descriptor_->columnId);
289  }
290 };
291 
293  const Catalog_Namespace::Catalog* catalog,
294  const TableDescriptor* td,
295  const int fragmentId,
296  const std::vector<TargetMetaInfo> sourceMetaInfo,
297  const std::vector<const ColumnDescriptor*> columnDescriptors,
298  const RowDataProvider& sourceDataProvider,
299  const size_t indexOffFragmentOffsetColumn,
300  const Data_Namespace::MemoryLevel memoryLevel,
301  UpdelRoll& updelRoll) {
302  if (!is_feature_enabled<VarlenUpdates>()) {
303  throw std::runtime_error("varlen UPDATE path not enabled.");
304  }
305 
306  updelRoll.is_varlen_update = true;
307  updelRoll.catalog = catalog;
308  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
309  updelRoll.memoryLevel = memoryLevel;
310 
311  size_t num_rows = sourceDataProvider.count();
312 
313  if (0 == num_rows) {
314  // bail out early
315  return;
316  }
317 
319 
320  auto& fragment = getFragmentInfoFromId(fragmentId);
321  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
322  get_chunks(catalog, td, fragment, memoryLevel, chunks);
323  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
324  columnDescriptors.size());
325  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
326 
327  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
328  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
329  auto chunk = chunks[indexOfChunk];
330  const auto chunk_cd = chunk->get_column_desc();
331 
332  if (chunk_cd->isDeletedCol) {
333  deletedChunk = chunk;
334  continue;
335  }
336 
337  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
338  columnDescriptors.end(),
339  [=](const ColumnDescriptor* cd) -> bool {
340  return cd->columnId == chunk_cd->columnId;
341  });
342 
343  if (targetColumnIt != columnDescriptors.end()) {
344  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
345 
346  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
347  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
348 
350  *catalog,
351  sourceDataMetaInfo,
352  targetDescriptor,
353  targetDescriptor->columnType,
354  !targetDescriptor->columnType.get_notnull(),
355  sourceDataProvider.getLiteralDictionary()};
356  auto converter = factory.create(param);
357  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
358 
359  if (targetDescriptor->columnType.is_geometry()) {
360  // geometry columns are composites
361  // need to skip chunks, depending on geo type
362  switch (targetDescriptor->columnType.get_type()) {
363  case kMULTIPOLYGON:
364  indexOfChunk += 5;
365  break;
366  case kPOLYGON:
367  indexOfChunk += 4;
368  break;
369  case kLINESTRING:
370  indexOfChunk += 2;
371  break;
372  case kPOINT:
373  indexOfChunk += 1;
374  break;
375  default:
376  CHECK(false); // not supported
377  }
378  }
379  } else {
380  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
381  std::unique_ptr<ChunkToInsertDataConverter> converter;
382 
383  if (chunk_cd->columnType.is_fixlen_array()) {
384  converter =
385  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
386  } else if (chunk_cd->columnType.is_string()) {
387  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
388  } else if (chunk_cd->columnType.is_geometry()) {
389  // the logical geo column is a string column
390  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
391  } else {
392  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
393  }
394 
395  chunkConverters.push_back(std::move(converter));
396 
397  } else if (chunk_cd->columnType.is_date_in_days()) {
398  /* Q: Why do we need this?
399  A: In variable length updates path we move the chunk content of column
400  without decoding. Since it again passes through DateDaysEncoder
401  the expected value should be in seconds, but here it will be in days.
402  Therefore, using DateChunkConverter chunk values are being scaled to
403  seconds which then ultimately encoded in days in DateDaysEncoder.
404  */
405  std::unique_ptr<ChunkToInsertDataConverter> converter;
406  const size_t physical_size = chunk_cd->columnType.get_size();
407  if (physical_size == 2) {
408  converter =
409  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
410  } else if (physical_size == 4) {
411  converter =
412  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
413  } else {
414  CHECK(false);
415  }
416  chunkConverters.push_back(std::move(converter));
417  } else {
418  std::unique_ptr<ChunkToInsertDataConverter> converter;
419  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
420  int logical_size = logical_type.get_size();
421  int physical_size = chunk_cd->columnType.get_size();
422 
423  if (logical_type.is_string()) {
424  // for dicts -> logical = physical
425  logical_size = physical_size;
426  }
427 
428  if (8 == physical_size) {
429  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
430  num_rows, chunk.get());
431  } else if (4 == physical_size) {
432  if (8 == logical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
434  num_rows, chunk.get());
435  } else {
436  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
437  num_rows, chunk.get());
438  }
439  } else if (2 == chunk_cd->columnType.get_size()) {
440  if (8 == logical_size) {
441  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
442  num_rows, chunk.get());
443  } else if (4 == logical_size) {
444  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
445  num_rows, chunk.get());
446  } else {
447  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
448  num_rows, chunk.get());
449  }
450  } else if (1 == chunk_cd->columnType.get_size()) {
451  if (8 == logical_size) {
452  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
453  num_rows, chunk.get());
454  } else if (4 == logical_size) {
455  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
456  num_rows, chunk.get());
457  } else if (2 == logical_size) {
458  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
459  num_rows, chunk.get());
460  } else {
461  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
462  num_rows, chunk.get());
463  }
464  } else {
465  CHECK(false); // unknown
466  }
467 
468  chunkConverters.push_back(std::move(converter));
469  }
470  }
471  }
472 
473  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
474  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
475 
476  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
477  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
478  deletedChunk->get_column_desc()->tableId,
479  deletedChunk->get_column_desc()->columnId,
480  fragment.fragmentId};
481  updelRoll.dirtyChunkeys.insert(chunkey);
482  bool* deletedChunkBuffer =
483  reinterpret_cast<bool*>(deletedChunk->get_buffer()->getMemoryPtr());
484 
485  auto row_converter = [&sourceDataProvider,
486  &sourceDataConverters,
487  &indexOffFragmentOffsetColumn,
488  &chunkConverters,
489  &deletedChunkBuffer](size_t indexOfRow) -> void {
490  // convert the source data
491  const auto row = sourceDataProvider.getEntryAt(indexOfRow);
492 
493  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
494  if (sourceDataConverters[col]) {
495  const auto& mapd_variant = row[col];
496  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
497  }
498  }
499 
500  auto scalar = checked_get(
501  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
502  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
503 
504  // convert the remaining chunks
505  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
506  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
507  }
508 
509  // now mark the row as deleted
510  deletedChunkBuffer[indexInChunkBuffer] = true;
511  };
512 
513  bool can_go_parallel = num_rows > 20000;
514 
515  if (can_go_parallel) {
516  const size_t num_worker_threads = cpu_threads();
517  std::vector<std::future<void>> worker_threads;
518  for (size_t i = 0,
519  start_entry = 0,
520  stride = (num_rows + num_worker_threads - 1) / num_worker_threads;
521  i < num_worker_threads && start_entry < num_rows;
522  ++i, start_entry += stride) {
523  const auto end_entry = std::min(start_entry + stride, num_rows);
524  worker_threads.push_back(std::async(
525  std::launch::async,
526  [&row_converter](const size_t start, const size_t end) {
527  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
528  row_converter(indexOfRow);
529  }
530  },
531  start_entry,
532  end_entry));
533  }
534 
535  for (auto& child : worker_threads) {
536  child.wait();
537  }
538 
539  } else {
540  for (size_t indexOfRow = 0; indexOfRow < num_rows; indexOfRow++) {
541  row_converter(indexOfRow);
542  }
543  }
544 
546  insert_data.databaseId = catalog->getCurrentDB().dbId;
547  insert_data.tableId = td->tableId;
548 
549  for (size_t i = 0; i < chunkConverters.size(); i++) {
550  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
551  continue;
552  }
553 
554  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
555  if (sourceDataConverters[i]) {
556  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
557  }
558  continue;
559  }
560 
561  insert_data.numRows = num_rows;
562  insertDataNoCheckpoint(insert_data);
563 
564  // update metdata
565  if (!deletedChunk->get_buffer()->hasEncoder) {
566  deletedChunk->init_encoder();
567  }
568  deletedChunk->get_buffer()->encoder->updateStats(static_cast<int64_t>(true), false);
569 
570  auto& shadowDeletedChunkMeta =
571  fragment.shadowChunkMetadataMap[deletedChunk->get_column_desc()->columnId];
572  if (shadowDeletedChunkMeta.numElements >
573  deletedChunk->get_buffer()->encoder->getNumElems()) {
574  // the append will have populated shadow meta data, otherwise use existing num
575  // elements
576  deletedChunk->get_buffer()->encoder->setNumElems(shadowDeletedChunkMeta.numElements);
577  }
578  deletedChunk->get_buffer()->setUpdated();
579 }
580 
582  const TableDescriptor* td,
583  const ColumnDescriptor* cd,
584  const int fragment_id,
585  const std::vector<uint64_t>& frag_offsets,
586  const std::vector<ScalarTargetValue>& rhs_values,
587  const SQLTypeInfo& rhs_type,
588  const Data_Namespace::MemoryLevel memory_level,
589  UpdelRoll& updel_roll) {
590  updel_roll.catalog = catalog;
591  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
592  updel_roll.memoryLevel = memory_level;
593 
594  const size_t ncore = cpu_threads();
595  const auto nrow = frag_offsets.size();
596  const auto n_rhs_values = rhs_values.size();
597  if (0 == nrow) {
598  return;
599  }
600  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
601 
602  auto& fragment = getFragmentInfoFromId(fragment_id);
603  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
604  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
605  ChunkKey chunk_key{
606  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
607  auto chunk = Chunk_NS::Chunk::getChunk(cd,
608  &catalog->getDataMgr(),
609  chunk_key,
611  0,
612  chunk_meta_it->second.numBytes,
613  chunk_meta_it->second.numElements);
614 
615  std::vector<int8_t> has_null_per_thread(ncore, 0);
616  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
617  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
618  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
619  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
620 
621  // parallel update elements
622  std::vector<std::future<void>> threads;
623 
624  const auto segsz = (nrow + ncore - 1) / ncore;
625  auto dbuf = chunk->get_buffer();
626  auto dbuf_addr = dbuf->getMemoryPtr();
627  dbuf->setUpdated();
628  {
629  std::lock_guard<std::mutex> lck(updel_roll.mutex);
630  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
631  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
632  }
633 
634  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
635  cd->tableId,
636  cd->columnId,
637  fragment.fragmentId};
638  updel_roll.dirtyChunkeys.insert(chunkey);
639  }
640  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
641  threads.emplace_back(std::async(
642  std::launch::async,
643  [=,
644  &has_null_per_thread,
645  &min_int64t_per_thread,
646  &max_int64t_per_thread,
647  &min_double_per_thread,
648  &max_double_per_thread,
649  &frag_offsets,
650  &rhs_values] {
651  SQLTypeInfo lhs_type = cd->columnType;
652 
653  // !! not sure if this is a undocumented convention or a bug, but for a sharded
654  // table the dictionary id of a encoded string column is not specified by
655  // comp_param in physical table but somehow in logical table :) comp_param in
656  // physical table is always 0, so need to adapt accordingly...
657  auto cdl = (shard_ < 0)
658  ? cd
659  : catalog->getMetadataForColumn(
660  catalog->getLogicalTableId(td->tableId), cd->columnId);
661  CHECK(cdl);
662  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
663  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
664  lhs_type, &decimalOverflowValidator);
665  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
666  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
667  lhs_type, &dateDaysOverflowValidator);
668 
669  StringDictionary* stringDict{nullptr};
670  if (lhs_type.is_string()) {
671  CHECK(kENCODING_DICT == lhs_type.get_compression());
672  auto dictDesc = const_cast<DictDescriptor*>(
673  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
674  CHECK(dictDesc);
675  stringDict = dictDesc->stringDict.get();
676  CHECK(stringDict);
677  }
678 
679  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
680  const auto roffs = frag_offsets[r];
681  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
682  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
683  ScalarTargetValue sv2;
684 
685  // Subtle here is on the two cases of string-to-string assignments, when
686  // upstream passes RHS string as a string index instead of a preferred "real
687  // string".
688  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
689  // index
690  // in this layer, so if upstream passes a str idx here, an
691  // exception is thrown.
692  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
693  // str idx.
694  if (rhs_type.is_string()) {
695  if (const auto vp = boost::get<int64_t>(sv)) {
696  auto dictDesc = const_cast<DictDescriptor*>(
697  catalog->getMetadataForDict(rhs_type.get_comp_param()));
698  if (nullptr == dictDesc) {
699  throw std::runtime_error(
700  "UPDATE does not support cast from string literal to string "
701  "column.");
702  }
703  auto stringDict = dictDesc->stringDict.get();
704  CHECK(stringDict);
705  sv2 = NullableString(stringDict->getString(*vp));
706  sv = &sv2;
707  }
708  }
709 
710  if (const auto vp = boost::get<int64_t>(sv)) {
711  auto v = *vp;
712  if (lhs_type.is_string()) {
713  throw std::runtime_error("UPDATE does not support cast to string.");
714  }
715  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
716  if (lhs_type.is_decimal()) {
717  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
718  int64_t decimal_val;
719  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
720  tabulate_metadata(lhs_type,
721  min_int64t_per_thread[c],
722  max_int64t_per_thread[c],
723  has_null_per_thread[c],
724  (v == inline_int_null_value<int64_t>() &&
725  lhs_type.get_notnull() == false)
726  ? v
727  : decimal_val);
728  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
729  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
730  if (positive_v_and_negative_d || negative_v_and_positive_d) {
731  throw std::runtime_error(
732  "Data conversion overflow on " + std::to_string(v) +
733  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
734  std::to_string(rhs_type.get_scale()) + ") to (" +
735  std::to_string(lhs_type.get_dimension()) + ", " +
736  std::to_string(lhs_type.get_scale()) + ")");
737  }
738  } else if (is_integral(lhs_type)) {
739  if (lhs_type.is_date_in_days()) {
740  // Store meta values in seconds
741  if (lhs_type.get_size() == 2) {
742  nullAwareDateOverflowValidator.validate<int16_t>(v);
743  } else {
744  nullAwareDateOverflowValidator.validate<int32_t>(v);
745  }
746  int64_t days;
747  get_scalar<int64_t>(data_ptr, lhs_type, days);
748  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
749  tabulate_metadata(lhs_type,
750  min_int64t_per_thread[c],
751  max_int64t_per_thread[c],
752  has_null_per_thread[c],
753  (v == inline_int_null_value<int64_t>() &&
754  lhs_type.get_notnull() == false)
755  ? NullSentinelSupplier()(lhs_type, v)
756  : seconds);
757  } else {
758  int64_t target_value;
759  if (rhs_type.is_decimal()) {
760  target_value = round(decimal_to_double(rhs_type, v));
761  } else {
762  target_value = v;
763  }
764  tabulate_metadata(lhs_type,
765  min_int64t_per_thread[c],
766  max_int64t_per_thread[c],
767  has_null_per_thread[c],
768  target_value);
769  }
770  } else {
772  lhs_type,
773  min_double_per_thread[c],
774  max_double_per_thread[c],
775  has_null_per_thread[c],
776  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
777  }
778  } else if (const auto vp = boost::get<double>(sv)) {
779  auto v = *vp;
780  if (lhs_type.is_string()) {
781  throw std::runtime_error("UPDATE does not support cast to string.");
782  }
783  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
784  if (lhs_type.is_integer()) {
785  tabulate_metadata(lhs_type,
786  min_int64t_per_thread[c],
787  max_int64t_per_thread[c],
788  has_null_per_thread[c],
789  int64_t(v));
790  } else if (lhs_type.is_fp()) {
791  tabulate_metadata(lhs_type,
792  min_double_per_thread[c],
793  max_double_per_thread[c],
794  has_null_per_thread[c],
795  double(v));
796  } else {
797  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
798  "LHS with a floating RHS.";
799  }
800  } else if (const auto vp = boost::get<float>(sv)) {
801  auto v = *vp;
802  if (lhs_type.is_string()) {
803  throw std::runtime_error("UPDATE does not support cast to string.");
804  }
805  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
806  if (lhs_type.is_integer()) {
807  tabulate_metadata(lhs_type,
808  min_int64t_per_thread[c],
809  max_int64t_per_thread[c],
810  has_null_per_thread[c],
811  int64_t(v));
812  } else {
813  tabulate_metadata(lhs_type,
814  min_double_per_thread[c],
815  max_double_per_thread[c],
816  has_null_per_thread[c],
817  double(v));
818  }
819  } else if (const auto vp = boost::get<NullableString>(sv)) {
820  const auto s = boost::get<std::string>(vp);
821  const auto sval = s ? *s : std::string("");
822  if (lhs_type.is_string()) {
823  decltype(stringDict->getOrAdd(sval)) sidx;
824  {
825  std::unique_lock<std::mutex> lock(temp_mutex_);
826  sidx = stringDict->getOrAdd(sval);
827  }
828  put_scalar<int32_t>(data_ptr, lhs_type, sidx, cd->columnName);
829  tabulate_metadata(lhs_type,
830  min_int64t_per_thread[c],
831  max_int64t_per_thread[c],
832  has_null_per_thread[c],
833  int64_t(sidx));
834  } else if (sval.size() > 0) {
835  auto dval = std::atof(sval.data());
836  if (lhs_type.is_boolean()) {
837  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
838  } else if (lhs_type.is_time()) {
839  throw std::runtime_error(
840  "Date/Time/Timestamp update not supported through translated "
841  "string path.");
842  }
843  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
844  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
845  tabulate_metadata(lhs_type,
846  min_double_per_thread[c],
847  max_double_per_thread[c],
848  has_null_per_thread[c],
849  double(dval));
850  } else {
851  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
852  tabulate_metadata(lhs_type,
853  min_int64t_per_thread[c],
854  max_int64t_per_thread[c],
855  has_null_per_thread[c],
856  int64_t(dval));
857  }
858  } else {
859  put_null(data_ptr, lhs_type, cd->columnName);
860  has_null_per_thread[c] = true;
861  }
862  } else {
863  CHECK(false);
864  }
865  }
866  }));
867  if (threads.size() >= (size_t)cpu_threads()) {
868  wait_cleanup_threads(threads);
869  }
870  }
871  wait_cleanup_threads(threads);
872 
873  // for unit test
875  if (cd->isDeletedCol) {
876  const auto deleted_offsets = getVacuumOffsets(chunk);
877  if (deleted_offsets.size() > 0) {
878  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
879  return;
880  }
881  }
882  }
883  bool has_null_per_chunk{false};
884  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
885  double min_double_per_chunk{std::numeric_limits<double>::max()};
886  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
887  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
888  for (size_t c = 0; c < ncore; ++c) {
889  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
890  max_double_per_chunk =
891  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
892  min_double_per_chunk =
893  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
894  max_int64t_per_chunk =
895  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
896  min_int64t_per_chunk =
897  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
898  }
900  fragment,
901  chunk,
902  has_null_per_chunk,
903  max_double_per_chunk,
904  min_double_per_chunk,
905  max_int64t_per_chunk,
906  min_int64t_per_chunk,
907  cd->columnType,
908  updel_roll);
909 }
910 
912  FragmentInfo& fragment,
913  std::shared_ptr<Chunk_NS::Chunk> chunk,
914  const bool has_null_per_chunk,
915  const double max_double_per_chunk,
916  const double min_double_per_chunk,
917  const int64_t max_int64t_per_chunk,
918  const int64_t min_int64t_per_chunk,
919  const SQLTypeInfo& rhs_type,
920  UpdelRoll& updel_roll) {
921  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
922  auto key = std::make_pair(td, &fragment);
923  std::lock_guard<std::mutex> lck(updel_roll.mutex);
924  if (0 == updel_roll.chunkMetadata.count(key)) {
925  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
926  }
927  if (0 == updel_roll.numTuples.count(key)) {
928  updel_roll.numTuples[key] = fragment.shadowNumTuples;
929  }
930  auto& chunkMetadata = updel_roll.chunkMetadata[key];
931 
932  auto buffer = chunk->get_buffer();
933  const auto& lhs_type = cd->columnType;
934 
935  auto update_stats = [& encoder = buffer->encoder](auto min, auto max, auto has_null) {
936  static_assert(std::is_same<decltype(min), decltype(max)>::value,
937  "Type mismatch on min/max");
938  if (has_null) {
939  encoder->updateStats(decltype(min)(), true);
940  }
941  if (max < min) {
942  return;
943  }
944  encoder->updateStats(min, false);
945  encoder->updateStats(max, false);
946  };
947 
948  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
949  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
950  } else if (lhs_type.is_fp()) {
951  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
952  } else if (lhs_type.is_decimal()) {
953  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
954  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
955  has_null_per_chunk);
956  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
957  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
958  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
959  }
960  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
961 
962  // removed as @alex suggests. keep it commented in case of any chance to revisit
963  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
964 }
965 
967  const MetaDataKey& key,
968  UpdelRoll& updel_roll) {
969  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
970  if (updel_roll.chunkMetadata.count(key)) {
971  auto& fragmentInfo = *key.second;
972  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
973  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
974  fragmentInfo.setChunkMetadataMap(chunkMetadata);
975  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
976  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
977  // TODO(ppan): When fragment-level compaction is enable, the following code
978  // should suffice. When not (ie. existing code), we'll revert to update
979  // InsertOrderFragmenter::varLenColInfo_
980  /*
981  for (const auto cit : chunkMetadata) {
982  const auto& cd = *catalog->getMetadataForColumn(td->tableId, cit.first);
983  if (cd.columnType.get_size() < 0)
984  fragmentInfo.varLenColInfox[cd.columnId] = cit.second.numBytes;
985  }
986  */
987  }
988 }
989 
991  const TableDescriptor* td,
992  const FragmentInfo& fragment,
993  const Data_Namespace::MemoryLevel memory_level) {
994  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
995  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
996  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
997  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
998  ++ncol;
999  if (!cd->isVirtualCol) {
1000  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1001  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1002  ChunkKey chunk_key{
1003  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1004  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1005  &catalog_->getDataMgr(),
1006  chunk_key,
1007  memory_level,
1008  0,
1009  chunk_meta_it->second.numBytes,
1010  chunk_meta_it->second.numElements);
1011  chunks.push_back(chunk);
1012  }
1013  }
1014  }
1015  return chunks;
1016 }
1017 
1018 // get a sorted vector of offsets of rows to vacuum
1019 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1020  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1021  const auto data_buffer = chunk->get_buffer();
1022  const auto data_addr = data_buffer->getMemoryPtr();
1023  const size_t nrows_in_chunk = data_buffer->size();
1024  const size_t ncore = cpu_threads();
1025  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1026  std::vector<std::vector<uint64_t>> deleted_offsets;
1027  deleted_offsets.resize(ncore);
1028  std::vector<std::future<void>> threads;
1029  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1030  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1031  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1032  const auto ithread = rbegin / segsz;
1033  CHECK(ithread < deleted_offsets.size());
1034  deleted_offsets[ithread].reserve(segsz);
1035  for (size_t r = rbegin; r < rend; ++r) {
1036  if (data_addr[r]) {
1037  deleted_offsets[ithread].push_back(r);
1038  }
1039  }
1040  }));
1041  }
1042  wait_cleanup_threads(threads);
1043  std::vector<uint64_t> all_deleted_offsets;
1044  for (size_t i = 0; i < ncore; ++i) {
1045  all_deleted_offsets.insert(
1046  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1047  }
1048  return all_deleted_offsets;
1049 }
1050 
1051 template <typename T>
1052 static void set_chunk_stats(const SQLTypeInfo& col_type,
1053  int8_t* data_addr,
1054  int8_t& has_null,
1055  T& min,
1056  T& max) {
1057  T v;
1058  const auto can_be_null = !col_type.get_notnull();
1059  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1060  if (is_null) {
1061  has_null = has_null || (can_be_null && is_null);
1062  } else {
1063  set_minmax(min, max, v);
1064  }
1065 }
1066 
1068  FragmentInfo& fragment,
1069  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1070  const size_t nrows_to_keep,
1071  UpdelRoll& updel_roll) {
1072  auto cd = chunk->get_column_desc();
1073  auto td = catalog->getMetadataForTable(cd->tableId);
1074  auto data_buffer = chunk->get_buffer();
1075  std::lock_guard<std::mutex> lck(updel_roll.mutex);
1076  const auto key = std::make_pair(td, &fragment);
1077  if (0 == updel_roll.chunkMetadata.count(key)) {
1078  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
1079  }
1080  auto& chunkMetadata = updel_roll.chunkMetadata[key];
1081  chunkMetadata[cd->columnId].numElements = nrows_to_keep;
1082  chunkMetadata[cd->columnId].numBytes = data_buffer->size();
1083  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
1084  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
1085  }
1086 }
1087 
1089  const FragmentInfo& fragment,
1090  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1091  const std::vector<uint64_t>& frag_offsets) {
1092  const auto cd = chunk->get_column_desc();
1093  const auto& col_type = cd->columnType;
1094  auto data_buffer = chunk->get_buffer();
1095  auto data_addr = data_buffer->getMemoryPtr();
1096  auto element_size =
1097  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1098  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1099  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1100  size_t nbytes_fix_data_to_keep = 0;
1101  auto nrows_to_vacuum = frag_offsets.size();
1102  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1103  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1104  auto is_last_one = irow == nrows_to_vacuum;
1105  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1106  auto maddr_to_vacuum = data_addr;
1107  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1108  if (nrows_to_keep > 0) {
1109  auto nbytes_to_keep = nrows_to_keep * element_size;
1110  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1111  // move curr fixlen row block toward front
1112  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1113  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1114  nbytes_to_keep);
1115  }
1116  irow_of_blk_to_fill += nrows_to_keep;
1117  nbytes_fix_data_to_keep += nbytes_to_keep;
1118  }
1119  irow_of_blk_to_keep = irow_to_vacuum + 1;
1120  }
1121  return nbytes_fix_data_to_keep;
1122 }
1123 
1125  const FragmentInfo& fragment,
1126  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1127  const std::vector<uint64_t>& frag_offsets) {
1128  auto data_buffer = chunk->get_buffer();
1129  auto index_buffer = chunk->get_index_buf();
1130  auto data_addr = data_buffer->getMemoryPtr();
1131  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1132  auto index_array = (StringOffsetT*)indices_addr;
1133  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1134  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1135  size_t nbytes_fix_data_to_keep = 0;
1136  size_t nbytes_var_data_to_keep = 0;
1137  auto nrows_to_vacuum = frag_offsets.size();
1138  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1139  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1140  auto is_last_one = irow == nrows_to_vacuum;
1141  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1142  auto maddr_to_vacuum = data_addr;
1143  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1144  if (nrows_to_keep > 0) {
1145  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1146  auto nbytes_to_keep =
1147  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1148  index_array[irow_of_blk_to_keep];
1149  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1150  // move curr varlen row block toward front
1151  memmove(data_addr + ibyte_var_data_to_keep,
1152  data_addr + index_array[irow_of_blk_to_keep],
1153  nbytes_to_keep);
1154 
1155  const auto index_base = index_array[irow_of_blk_to_keep];
1156  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1157  auto& index = index_array[irow_of_blk_to_keep + i];
1158  index = ibyte_var_data_to_keep + (index - index_base);
1159  }
1160  }
1161  nbytes_var_data_to_keep += nbytes_to_keep;
1162  maddr_to_vacuum = indices_addr;
1163 
1164  constexpr static auto index_element_size = sizeof(StringOffsetT);
1165  nbytes_to_keep = nrows_to_keep * index_element_size;
1166  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1167  // move curr fixlen row block toward front
1168  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1169  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1170  nbytes_to_keep);
1171  }
1172  irow_of_blk_to_fill += nrows_to_keep;
1173  nbytes_fix_data_to_keep += nbytes_to_keep;
1174  }
1175  irow_of_blk_to_keep = irow_to_vacuum + 1;
1176  }
1177  return nbytes_var_data_to_keep;
1178 }
1179 
1181  const TableDescriptor* td,
1182  const int fragment_id,
1183  const std::vector<uint64_t>& frag_offsets,
1184  const Data_Namespace::MemoryLevel memory_level,
1185  UpdelRoll& updel_roll) {
1186  auto& fragment = getFragmentInfoFromId(fragment_id);
1187  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1188  const auto ncol = chunks.size();
1189 
1190  std::vector<int8_t> has_null_per_thread(ncol, 0);
1191  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1192  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1193  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1194  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1195 
1196  // parallel delete columns
1197  std::vector<std::future<void>> threads;
1198  auto nrows_to_vacuum = frag_offsets.size();
1199  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1200  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1201 
1202  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1203  auto chunk = chunks[ci];
1204  const auto cd = chunk->get_column_desc();
1205  const auto& col_type = cd->columnType;
1206  auto data_buffer = chunk->get_buffer();
1207  auto index_buffer = chunk->get_index_buf();
1208  auto data_addr = data_buffer->getMemoryPtr();
1209  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1210  auto index_array = (StringOffsetT*)indices_addr;
1211  bool is_varlen = col_type.is_varlen_indeed();
1212 
1213  auto fixlen_vacuum = [=,
1214  &has_null_per_thread,
1215  &max_double_per_thread,
1216  &min_double_per_thread,
1217  &min_int64t_per_thread,
1218  &max_int64t_per_thread,
1219  &updel_roll,
1220  &frag_offsets,
1221  &fragment] {
1222  size_t nbytes_fix_data_to_keep;
1223  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1224 
1225  data_buffer->encoder->setNumElems(nrows_to_keep);
1226  data_buffer->setSize(nbytes_fix_data_to_keep);
1227  data_buffer->setUpdated();
1228 
1229  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1230 
1231  auto daddr = data_addr;
1232  auto element_size =
1233  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1234  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1235  if (col_type.is_fixlen_array()) {
1236  auto encoder =
1237  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1238  CHECK(encoder);
1239  encoder->updateMetadata((int8_t*)daddr);
1240  } else if (col_type.is_fp()) {
1241  set_chunk_stats(col_type,
1242  data_addr,
1243  has_null_per_thread[ci],
1244  min_double_per_thread[ci],
1245  max_double_per_thread[ci]);
1246  } else {
1247  set_chunk_stats(col_type,
1248  data_addr,
1249  has_null_per_thread[ci],
1250  min_int64t_per_thread[ci],
1251  max_int64t_per_thread[ci]);
1252  }
1253  }
1254  };
1255 
1256  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1257  size_t nbytes_var_data_to_keep;
1258  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1259 
1260  data_buffer->encoder->setNumElems(nrows_to_keep);
1261  data_buffer->setSize(nbytes_var_data_to_keep);
1262  data_buffer->setUpdated();
1263 
1264  index_array[nrows_to_keep] = data_buffer->size();
1265  index_buffer->setSize(sizeof(*index_array) *
1266  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1267  index_buffer->setUpdated();
1268 
1269  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1270  };
1271 
1272  if (is_varlen) {
1273  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1274  } else {
1275  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1276  }
1277  if (threads.size() >= (size_t)cpu_threads()) {
1278  wait_cleanup_threads(threads);
1279  }
1280  }
1281 
1282  wait_cleanup_threads(threads);
1283 
1284  auto key = std::make_pair(td, &fragment);
1285  updel_roll.numTuples[key] = nrows_to_keep;
1286  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1287  auto chunk = chunks[ci];
1288  auto cd = chunk->get_column_desc();
1289  if (!cd->columnType.is_fixlen_array()) {
1291  fragment,
1292  chunk,
1293  has_null_per_thread[ci],
1294  max_double_per_thread[ci],
1295  min_double_per_thread[ci],
1296  max_int64t_per_thread[ci],
1297  min_int64t_per_thread[ci],
1298  cd->columnType,
1299  updel_roll);
1300  }
1301  }
1302 }
1303 
1304 } // namespace Fragmenter_Namespace
1305 
1307  if (nullptr == catalog) {
1308  return;
1309  }
1310  const auto td = catalog->getMetadataForTable(logicalTableId);
1311  CHECK(td);
1312  // checkpoint all shards regardless, or epoch becomes out of sync
1313  if (td->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
1314  catalog->checkpoint(logicalTableId);
1315  }
1316  // for each dirty fragment
1317  for (auto& cm : chunkMetadata) {
1318  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1319  }
1320  dirtyChunks.clear();
1321  // flush gpu dirty chunks if update was not on gpu
1322  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1323  for (const auto& chunkey : dirtyChunkeys) {
1324  catalog->getDataMgr().deleteChunksWithPrefix(
1326  }
1327  }
1328 }
1329 
1331  if (nullptr == catalog) {
1332  return;
1333  }
1334 
1335  if (is_varlen_update) {
1336  int databaseId = catalog->getCurrentDB().dbId;
1337  int32_t tableEpoch = catalog->getTableEpoch(databaseId, logicalTableId);
1338 
1339  dirtyChunks.clear();
1340  const_cast<Catalog_Namespace::Catalog*>(catalog)->setTableEpoch(
1341  databaseId, logicalTableId, tableEpoch);
1342  } else {
1343  const auto td = catalog->getMetadataForTable(logicalTableId);
1344  CHECK(td);
1345  if (td->persistenceLevel != memoryLevel) {
1346  for (auto dit : dirtyChunks) {
1347  catalog->getDataMgr().free(dit.first->get_buffer());
1348  dit.first->set_buffer(nullptr);
1349  }
1350  }
1351  }
1352 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:67
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
bool is_time() const
Definition: sqltypes.h:456
HOST DEVICE int get_size() const
Definition: sqltypes.h:333
void commitUpdate()
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:325
const int8_t const int64_t * num_rows
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
bool is_fp() const
Definition: sqltypes.h:454
const ColumnDescriptor * get_column_desc() const
Definition: Chunk.h:52
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:138
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor *> columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll) override
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:139
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:40
AbstractBuffer * get_index_buf() const
Definition: Chunk.h:95
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
void cancelUpdate()
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:330
void insertData(InsertData &insertDataStruct) override
appends data onto the most recently occuring fragment, creating a new one if necessary ...
virtual int8_t * getMemoryPtr()=0
void c(const std::string &query_string, const ExecutorDeviceType device_type)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
HOST DEVICE int get_scale() const
Definition: sqltypes.h:328
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:231
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:840
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
const std::map< int, ChunkMetadata > & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:102
AbstractBuffer * get_buffer() const
Definition: Chunk.h:94
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:331
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:877
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:2914
bool is_timeinterval() const
Definition: sqltypes.h:461
int64_t get_epoch_seconds_from_days(const int64_t days)
bool is_integer() const
Definition: sqltypes.h:452
T v(const TargetValue &r)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
bool is_decimal() const
Definition: sqltypes.h:453
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1348
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE > > ColumnDataPtr
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
bool is_boolean() const
Definition: sqltypes.h:457
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
bool is_null(const T &v, const SQLTypeInfo &t)
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual size_t count() const =0
unencoded fixed length array encoder
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:332
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t > > ColumnDataPtr
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int > ChunkKey
Definition: types.h:35
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
SQLTypeInfo columnType
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
specifies the content in-memory of a row in the table metadata table
int8_t * numbersPtr
Definition: sqltypes.h:137
void set_minmax(T &min, T &max, T const val)
int cpu_threads()
Definition: thread_count.h:23
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
bool is_string() const
Definition: sqltypes.h:450
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
std::conditional_t< isCudaCC(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:119
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)