OmniSciDB  8a228a1076
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <boost/variant.hpp>
18 #include <boost/variant/get.hpp>
19 #include <limits>
20 #include <mutex>
21 #include <string>
22 #include <vector>
23 
24 #include "Catalog/Catalog.h"
25 #include "DataMgr/DataMgr.h"
28 #include "Logger/Logger.h"
29 #include "QueryEngine/Execute.h"
31 #include "Shared/DateConverters.h"
33 #include "Shared/thread_count.h"
35 
37 
38 namespace Fragmenter_Namespace {
39 
40 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
41  for (auto& t : threads) {
42  t.get();
43  }
44  threads.clear();
45 }
46 
47 inline bool is_integral(const SQLTypeInfo& t) {
48  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
49 }
50 
52 
54  const std::string& tab_name,
55  const std::string& col_name,
56  const int fragment_id,
57  const std::vector<uint64_t>& frag_offsets,
58  const std::vector<ScalarTargetValue>& rhs_values,
59  const SQLTypeInfo& rhs_type,
60  const Data_Namespace::MemoryLevel memory_level,
61  UpdelRoll& updel_roll) {
62  const auto td = catalog->getMetadataForTable(tab_name);
63  CHECK(td);
64  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
65  CHECK(cd);
66  td->fragmenter->updateColumn(catalog,
67  td,
68  cd,
69  fragment_id,
70  frag_offsets,
71  rhs_values,
72  rhs_type,
73  memory_level,
74  updel_roll);
75 }
76 
78  const TableDescriptor* td,
79  const ColumnDescriptor* cd,
80  const int fragment_id,
81  const std::vector<uint64_t>& frag_offsets,
82  const ScalarTargetValue& rhs_value,
83  const SQLTypeInfo& rhs_type,
84  const Data_Namespace::MemoryLevel memory_level,
85  UpdelRoll& updel_roll) {
86  updateColumn(catalog,
87  td,
88  cd,
89  fragment_id,
90  frag_offsets,
91  std::vector<ScalarTargetValue>(1, rhs_value),
92  rhs_type,
93  memory_level,
94  updel_roll);
95 }
96 
97 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
98  const TableDescriptor* td,
99  const FragmentInfo& fragment,
100  const Data_Namespace::MemoryLevel memory_level,
101  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
102  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
103  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
104  ++nc;
105  if (!cd->isVirtualCol) {
106  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
107  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
108  ChunkKey chunk_key{
109  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
110  auto chunk = Chunk_NS::Chunk::getChunk(cd,
111  &catalog->getDataMgr(),
112  chunk_key,
113  memory_level,
114  0,
115  chunk_meta_it->second->numBytes,
116  chunk_meta_it->second->numElements);
117  chunks.push_back(chunk);
118  }
119  }
120  }
121  return chunks.size();
122 }
123 
125  public:
127 
128  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
129 
130  virtual void addDataBlocksToInsertData(
132 };
133 
134 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
136  using ColumnDataPtr =
137  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
138 
142  const BUFFER_DATA_TYPE* data_buffer_addr_;
143 
144  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
145  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
146  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
147  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
148  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
149  }
150 
151  ~ScalarChunkConverter() override {}
152 
153  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
154  auto buffer_value = data_buffer_addr_[indexInFragment];
155  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
156  column_data_.get()[row] = insert_value;
157  }
158 
160  DataBlockPtr dataBlock;
161  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
162  insertData.data.push_back(dataBlock);
163  insertData.columnIds.push_back(column_descriptor_->columnId);
164  }
165 };
166 
170 
171  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
174 
176  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
177  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
178  data_buffer_addr_ = chunk->getBuffer()->getMemoryPtr();
179  fixed_array_length_ = chunk->getColumnDesc()->columnType.get_size();
180  }
181 
183 
184  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
185  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
186 
187  bool is_null = FixedLengthArrayNoneEncoder::is_null(column_descriptor_->columnType,
188  src_value_ptr);
189 
190  (*column_data_)[row] = ArrayDatum(
191  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
192  }
193 
195  DataBlockPtr dataBlock;
196  dataBlock.arraysPtr = column_data_.get();
197  insertData.data.push_back(dataBlock);
198  insertData.columnIds.push_back(column_descriptor_->columnId);
199  }
200 };
201 
204 
205  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
206  : FixedLenArrayChunkConverter(num_rows, chunk) {
207  index_buffer_addr_ =
208  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
209  : nullptr);
210  }
211 
212  ~ArrayChunkConverter() override {}
213 
214  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
215  auto startIndex = index_buffer_addr_[indexInFragment];
216  auto endIndex = index_buffer_addr_[indexInFragment + 1];
217  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
218  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
219  (*column_data_)[row] = ArrayDatum(
220  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
221  }
222 };
223 
227 
228  std::unique_ptr<std::vector<std::string>> column_data_;
229  const int8_t* data_buffer_addr_;
231 
233  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
234  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
235  data_buffer_addr_ = chunk->getBuffer()->getMemoryPtr();
236  index_buffer_addr_ =
237  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
238  : nullptr);
239  }
240 
241  ~StringChunkConverter() override {}
242 
243  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
244  size_t src_value_size =
245  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
246  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
247  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
248  }
249 
251  DataBlockPtr dataBlock;
252  dataBlock.stringsPtr = column_data_.get();
253  insertData.data.push_back(dataBlock);
254  insertData.columnIds.push_back(column_descriptor_->columnId);
255  }
256 };
257 
258 template <typename BUFFER_DATA_TYPE>
260  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
261 
265  const BUFFER_DATA_TYPE* data_buffer_addr_;
266 
267  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
268  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
269  column_data_ = ColumnDataPtr(
270  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
271  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
272  }
273 
274  ~DateChunkConverter() override {}
275 
276  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
277  auto buffer_value = data_buffer_addr_[indexInFragment];
278  auto insert_value = static_cast<int64_t>(buffer_value);
279  column_data_.get()[row] = DateConverters::get_epoch_seconds_from_days(insert_value);
280  }
281 
283  DataBlockPtr dataBlock;
284  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
285  insertData.data.push_back(dataBlock);
286  insertData.columnIds.push_back(column_descriptor_->columnId);
287  }
288 };
289 
291  const Catalog_Namespace::Catalog* catalog,
292  const TableDescriptor* td,
293  const int fragmentId,
294  const std::vector<TargetMetaInfo> sourceMetaInfo,
295  const std::vector<const ColumnDescriptor*> columnDescriptors,
296  const RowDataProvider& sourceDataProvider,
297  const size_t indexOffFragmentOffsetColumn,
298  const Data_Namespace::MemoryLevel memoryLevel,
299  UpdelRoll& updelRoll,
300  Executor* executor) {
301  updelRoll.is_varlen_update = true;
302  updelRoll.catalog = catalog;
303  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
304  updelRoll.memoryLevel = memoryLevel;
305 
306  size_t num_entries = sourceDataProvider.getEntryCount();
307  size_t num_rows = sourceDataProvider.getRowCount();
308 
309  if (0 == num_rows) {
310  // bail out early
311  return;
312  }
313 
315 
316  auto fragment_ptr = getFragmentInfo(fragmentId);
317  auto& fragment = *fragment_ptr;
318  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
319  get_chunks(catalog, td, fragment, memoryLevel, chunks);
320  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
321  columnDescriptors.size());
322  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
323 
324  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
325  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
326  auto chunk = chunks[indexOfChunk];
327  const auto chunk_cd = chunk->getColumnDesc();
328 
329  if (chunk_cd->isDeletedCol) {
330  deletedChunk = chunk;
331  continue;
332  }
333 
334  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
335  columnDescriptors.end(),
336  [=](const ColumnDescriptor* cd) -> bool {
337  return cd->columnId == chunk_cd->columnId;
338  });
339 
340  if (targetColumnIt != columnDescriptors.end()) {
341  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
342 
343  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
344  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
345 
347  num_rows,
348  *catalog,
349  sourceDataMetaInfo,
350  targetDescriptor,
351  targetDescriptor->columnType,
352  !targetDescriptor->columnType.get_notnull(),
353  sourceDataProvider.getLiteralDictionary(),
355  ? executor->getStringDictionaryProxy(
356  sourceDataMetaInfo.get_type_info().get_comp_param(),
357  executor->getRowSetMemoryOwner(),
358  true)
359  : nullptr};
360  auto converter = factory.create(param);
361  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
362 
363  if (targetDescriptor->columnType.is_geometry()) {
364  // geometry columns are composites
365  // need to skip chunks, depending on geo type
366  switch (targetDescriptor->columnType.get_type()) {
367  case kMULTIPOLYGON:
368  indexOfChunk += 5;
369  break;
370  case kPOLYGON:
371  indexOfChunk += 4;
372  break;
373  case kLINESTRING:
374  indexOfChunk += 2;
375  break;
376  case kPOINT:
377  indexOfChunk += 1;
378  break;
379  default:
380  CHECK(false); // not supported
381  }
382  }
383  } else {
384  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
385  std::unique_ptr<ChunkToInsertDataConverter> converter;
386 
387  if (chunk_cd->columnType.is_fixlen_array()) {
388  converter =
389  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
390  } else if (chunk_cd->columnType.is_string()) {
391  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
392  } else if (chunk_cd->columnType.is_geometry()) {
393  // the logical geo column is a string column
394  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
395  } else {
396  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
397  }
398 
399  chunkConverters.push_back(std::move(converter));
400 
401  } else if (chunk_cd->columnType.is_date_in_days()) {
402  /* Q: Why do we need this?
403  A: In variable length updates path we move the chunk content of column
404  without decoding. Since it again passes through DateDaysEncoder
405  the expected value should be in seconds, but here it will be in days.
406  Therefore, using DateChunkConverter chunk values are being scaled to
407  seconds which then ultimately encoded in days in DateDaysEncoder.
408  */
409  std::unique_ptr<ChunkToInsertDataConverter> converter;
410  const size_t physical_size = chunk_cd->columnType.get_size();
411  if (physical_size == 2) {
412  converter =
413  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
414  } else if (physical_size == 4) {
415  converter =
416  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
417  } else {
418  CHECK(false);
419  }
420  chunkConverters.push_back(std::move(converter));
421  } else {
422  std::unique_ptr<ChunkToInsertDataConverter> converter;
423  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
424  int logical_size = logical_type.get_size();
425  int physical_size = chunk_cd->columnType.get_size();
426 
427  if (logical_type.is_string()) {
428  // for dicts -> logical = physical
429  logical_size = physical_size;
430  }
431 
432  if (8 == physical_size) {
433  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
434  num_rows, chunk.get());
435  } else if (4 == physical_size) {
436  if (8 == logical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
438  num_rows, chunk.get());
439  } else {
440  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
441  num_rows, chunk.get());
442  }
443  } else if (2 == chunk_cd->columnType.get_size()) {
444  if (8 == logical_size) {
445  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
446  num_rows, chunk.get());
447  } else if (4 == logical_size) {
448  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
449  num_rows, chunk.get());
450  } else {
451  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
452  num_rows, chunk.get());
453  }
454  } else if (1 == chunk_cd->columnType.get_size()) {
455  if (8 == logical_size) {
456  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
457  num_rows, chunk.get());
458  } else if (4 == logical_size) {
459  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
460  num_rows, chunk.get());
461  } else if (2 == logical_size) {
462  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
463  num_rows, chunk.get());
464  } else {
465  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
466  num_rows, chunk.get());
467  }
468  } else {
469  CHECK(false); // unknown
470  }
471 
472  chunkConverters.push_back(std::move(converter));
473  }
474  }
475  }
476 
477  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
478  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
479 
480  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
481  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
482  deletedChunk->getColumnDesc()->tableId,
483  deletedChunk->getColumnDesc()->columnId,
484  fragment.fragmentId};
485  updelRoll.dirtyChunkeys.insert(chunkey);
486  bool* deletedChunkBuffer =
487  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
488 
489  std::atomic<size_t> row_idx{0};
490 
491  auto row_converter = [&sourceDataProvider,
492  &sourceDataConverters,
493  &indexOffFragmentOffsetColumn,
494  &chunkConverters,
495  &deletedChunkBuffer,
496  &row_idx](size_t indexOfEntry) -> void {
497  // convert the source data
498  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
499  if (row.empty()) {
500  return;
501  }
502 
503  size_t indexOfRow = row_idx.fetch_add(1);
504 
505  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
506  if (sourceDataConverters[col]) {
507  const auto& mapd_variant = row[col];
508  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
509  }
510  }
511 
512  auto scalar = checked_get(
513  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
514  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
515 
516  // convert the remaining chunks
517  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
518  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
519  }
520 
521  // now mark the row as deleted
522  deletedChunkBuffer[indexInChunkBuffer] = true;
523  };
524 
525  bool can_go_parallel = num_rows > 20000;
526 
527  if (can_go_parallel) {
528  const size_t num_worker_threads = cpu_threads();
529  std::vector<std::future<void>> worker_threads;
530  for (size_t i = 0,
531  start_entry = 0,
532  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
533  i < num_worker_threads && start_entry < num_entries;
534  ++i, start_entry += stride) {
535  const auto end_entry = std::min(start_entry + stride, num_rows);
536  worker_threads.push_back(std::async(
537  std::launch::async,
538  [&row_converter](const size_t start, const size_t end) {
539  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
540  row_converter(indexOfRow);
541  }
542  },
543  start_entry,
544  end_entry));
545  }
546 
547  for (auto& child : worker_threads) {
548  child.wait();
549  }
550 
551  } else {
552  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
553  row_converter(entryIdx);
554  }
555  }
556 
558  insert_data.databaseId = catalog->getCurrentDB().dbId;
559  insert_data.tableId = td->tableId;
560 
561  for (size_t i = 0; i < chunkConverters.size(); i++) {
562  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
563  continue;
564  }
565 
566  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
567  if (sourceDataConverters[i]) {
568  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
569  }
570  continue;
571  }
572 
573  insert_data.numRows = num_rows;
574  insertDataNoCheckpoint(insert_data);
575 
576  // update metdata
577  if (!deletedChunk->getBuffer()->has_encoder) {
578  deletedChunk->initEncoder();
579  }
580  deletedChunk->getBuffer()->encoder->updateStats(static_cast<int64_t>(true), false);
581 
582  auto& shadowDeletedChunkMeta =
583  fragment.shadowChunkMetadataMap[deletedChunk->getColumnDesc()->columnId];
584  if (shadowDeletedChunkMeta->numElements >
585  deletedChunk->getBuffer()->encoder->getNumElems()) {
586  // the append will have populated shadow meta data, otherwise use existing num
587  // elements
588  deletedChunk->getBuffer()->encoder->setNumElems(shadowDeletedChunkMeta->numElements);
589  }
590  deletedChunk->getBuffer()->setUpdated();
591 }
592 
594  const TableDescriptor* td,
595  const ColumnDescriptor* cd,
596  const int fragment_id,
597  const std::vector<uint64_t>& frag_offsets,
598  const std::vector<ScalarTargetValue>& rhs_values,
599  const SQLTypeInfo& rhs_type,
600  const Data_Namespace::MemoryLevel memory_level,
601  UpdelRoll& updel_roll) {
602  updel_roll.catalog = catalog;
603  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
604  updel_roll.memoryLevel = memory_level;
605 
606  const size_t ncore = cpu_threads();
607  const auto nrow = frag_offsets.size();
608  const auto n_rhs_values = rhs_values.size();
609  if (0 == nrow) {
610  return;
611  }
612  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
613 
614  auto fragment_ptr = getFragmentInfo(fragment_id);
615  auto& fragment = *fragment_ptr;
616  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
617  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
618  ChunkKey chunk_key{
619  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
620  auto chunk = Chunk_NS::Chunk::getChunk(cd,
621  &catalog->getDataMgr(),
622  chunk_key,
624  0,
625  chunk_meta_it->second->numBytes,
626  chunk_meta_it->second->numElements);
627 
628  std::vector<int8_t> has_null_per_thread(ncore, 0);
629  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
630  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
631  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
632  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
633 
634  // parallel update elements
635  std::vector<std::future<void>> threads;
636 
637  const auto segsz = (nrow + ncore - 1) / ncore;
638  auto dbuf = chunk->getBuffer();
639  auto dbuf_addr = dbuf->getMemoryPtr();
640  dbuf->setUpdated();
641  {
642  std::lock_guard<std::mutex> lck(updel_roll.mutex);
643  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
644  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
645  }
646 
647  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
648  cd->tableId,
649  cd->columnId,
650  fragment.fragmentId};
651  updel_roll.dirtyChunkeys.insert(chunkey);
652  }
653  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
654  threads.emplace_back(std::async(
655  std::launch::async,
656  [=,
657  &has_null_per_thread,
658  &min_int64t_per_thread,
659  &max_int64t_per_thread,
660  &min_double_per_thread,
661  &max_double_per_thread,
662  &frag_offsets,
663  &rhs_values] {
664  SQLTypeInfo lhs_type = cd->columnType;
665 
666  // !! not sure if this is a undocumented convention or a bug, but for a sharded
667  // table the dictionary id of a encoded string column is not specified by
668  // comp_param in physical table but somehow in logical table :) comp_param in
669  // physical table is always 0, so need to adapt accordingly...
670  auto cdl = (shard_ < 0)
671  ? cd
672  : catalog->getMetadataForColumn(
673  catalog->getLogicalTableId(td->tableId), cd->columnId);
674  CHECK(cdl);
675  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
676  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
677  lhs_type, &decimalOverflowValidator);
678  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
679  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
680  lhs_type, &dateDaysOverflowValidator);
681 
682  StringDictionary* stringDict{nullptr};
683  if (lhs_type.is_string()) {
684  CHECK(kENCODING_DICT == lhs_type.get_compression());
685  auto dictDesc = const_cast<DictDescriptor*>(
686  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
687  CHECK(dictDesc);
688  stringDict = dictDesc->stringDict.get();
689  CHECK(stringDict);
690  }
691 
692  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
693  const auto roffs = frag_offsets[r];
694  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
695  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
696  ScalarTargetValue sv2;
697 
698  // Subtle here is on the two cases of string-to-string assignments, when
699  // upstream passes RHS string as a string index instead of a preferred "real
700  // string".
701  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
702  // index
703  // in this layer, so if upstream passes a str idx here, an
704  // exception is thrown.
705  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
706  // str idx.
707  if (rhs_type.is_string()) {
708  if (const auto vp = boost::get<int64_t>(sv)) {
709  auto dictDesc = const_cast<DictDescriptor*>(
710  catalog->getMetadataForDict(rhs_type.get_comp_param()));
711  if (nullptr == dictDesc) {
712  throw std::runtime_error(
713  "UPDATE does not support cast from string literal to string "
714  "column.");
715  }
716  auto stringDict = dictDesc->stringDict.get();
717  CHECK(stringDict);
718  sv2 = NullableString(stringDict->getString(*vp));
719  sv = &sv2;
720  }
721  }
722 
723  if (const auto vp = boost::get<int64_t>(sv)) {
724  auto v = *vp;
725  if (lhs_type.is_string()) {
726  throw std::runtime_error("UPDATE does not support cast to string.");
727  }
728  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
729  if (lhs_type.is_decimal()) {
730  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
731  int64_t decimal_val;
732  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
733  tabulate_metadata(lhs_type,
734  min_int64t_per_thread[c],
735  max_int64t_per_thread[c],
736  has_null_per_thread[c],
737  (v == inline_int_null_value<int64_t>() &&
738  lhs_type.get_notnull() == false)
739  ? v
740  : decimal_val);
741  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
742  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
743  if (positive_v_and_negative_d || negative_v_and_positive_d) {
744  throw std::runtime_error(
745  "Data conversion overflow on " + std::to_string(v) +
746  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
747  std::to_string(rhs_type.get_scale()) + ") to (" +
748  std::to_string(lhs_type.get_dimension()) + ", " +
749  std::to_string(lhs_type.get_scale()) + ")");
750  }
751  } else if (is_integral(lhs_type)) {
752  if (lhs_type.is_date_in_days()) {
753  // Store meta values in seconds
754  if (lhs_type.get_size() == 2) {
755  nullAwareDateOverflowValidator.validate<int16_t>(v);
756  } else {
757  nullAwareDateOverflowValidator.validate<int32_t>(v);
758  }
759  int64_t days;
760  get_scalar<int64_t>(data_ptr, lhs_type, days);
761  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
762  tabulate_metadata(lhs_type,
763  min_int64t_per_thread[c],
764  max_int64t_per_thread[c],
765  has_null_per_thread[c],
766  (v == inline_int_null_value<int64_t>() &&
767  lhs_type.get_notnull() == false)
768  ? NullSentinelSupplier()(lhs_type, v)
769  : seconds);
770  } else {
771  int64_t target_value;
772  if (rhs_type.is_decimal()) {
773  target_value = round(decimal_to_double(rhs_type, v));
774  } else {
775  target_value = v;
776  }
777  tabulate_metadata(lhs_type,
778  min_int64t_per_thread[c],
779  max_int64t_per_thread[c],
780  has_null_per_thread[c],
781  target_value);
782  }
783  } else {
785  lhs_type,
786  min_double_per_thread[c],
787  max_double_per_thread[c],
788  has_null_per_thread[c],
789  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
790  }
791  } else if (const auto vp = boost::get<double>(sv)) {
792  auto v = *vp;
793  if (lhs_type.is_string()) {
794  throw std::runtime_error("UPDATE does not support cast to string.");
795  }
796  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
797  if (lhs_type.is_integer()) {
798  tabulate_metadata(lhs_type,
799  min_int64t_per_thread[c],
800  max_int64t_per_thread[c],
801  has_null_per_thread[c],
802  int64_t(v));
803  } else if (lhs_type.is_fp()) {
804  tabulate_metadata(lhs_type,
805  min_double_per_thread[c],
806  max_double_per_thread[c],
807  has_null_per_thread[c],
808  double(v));
809  } else {
810  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
811  "LHS with a floating RHS.";
812  }
813  } else if (const auto vp = boost::get<float>(sv)) {
814  auto v = *vp;
815  if (lhs_type.is_string()) {
816  throw std::runtime_error("UPDATE does not support cast to string.");
817  }
818  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
819  if (lhs_type.is_integer()) {
820  tabulate_metadata(lhs_type,
821  min_int64t_per_thread[c],
822  max_int64t_per_thread[c],
823  has_null_per_thread[c],
824  int64_t(v));
825  } else {
826  tabulate_metadata(lhs_type,
827  min_double_per_thread[c],
828  max_double_per_thread[c],
829  has_null_per_thread[c],
830  double(v));
831  }
832  } else if (const auto vp = boost::get<NullableString>(sv)) {
833  const auto s = boost::get<std::string>(vp);
834  const auto sval = s ? *s : std::string("");
835  if (lhs_type.is_string()) {
836  decltype(stringDict->getOrAdd(sval)) sidx;
837  {
838  std::unique_lock<std::mutex> lock(temp_mutex_);
839  sidx = stringDict->getOrAdd(sval);
840  }
841  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
842  tabulate_metadata(lhs_type,
843  min_int64t_per_thread[c],
844  max_int64t_per_thread[c],
845  has_null_per_thread[c],
846  int64_t(sidx));
847  } else if (sval.size() > 0) {
848  auto dval = std::atof(sval.data());
849  if (lhs_type.is_boolean()) {
850  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
851  } else if (lhs_type.is_time()) {
852  throw std::runtime_error(
853  "Date/Time/Timestamp update not supported through translated "
854  "string path.");
855  }
856  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
857  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
858  tabulate_metadata(lhs_type,
859  min_double_per_thread[c],
860  max_double_per_thread[c],
861  has_null_per_thread[c],
862  double(dval));
863  } else {
864  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
865  tabulate_metadata(lhs_type,
866  min_int64t_per_thread[c],
867  max_int64t_per_thread[c],
868  has_null_per_thread[c],
869  int64_t(dval));
870  }
871  } else {
872  put_null(data_ptr, lhs_type, cd->columnName);
873  has_null_per_thread[c] = true;
874  }
875  } else {
876  CHECK(false);
877  }
878  }
879  }));
880  if (threads.size() >= (size_t)cpu_threads()) {
881  wait_cleanup_threads(threads);
882  }
883  }
884  wait_cleanup_threads(threads);
885 
886  // for unit test
888  if (cd->isDeletedCol) {
889  const auto deleted_offsets = getVacuumOffsets(chunk);
890  if (deleted_offsets.size() > 0) {
891  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
892  return;
893  }
894  }
895  }
896  bool has_null_per_chunk{false};
897  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
898  double min_double_per_chunk{std::numeric_limits<double>::max()};
899  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
900  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
901  for (size_t c = 0; c < ncore; ++c) {
902  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
903  max_double_per_chunk =
904  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
905  min_double_per_chunk =
906  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
907  max_int64t_per_chunk =
908  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
909  min_int64t_per_chunk =
910  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
911  }
913  fragment,
914  chunk,
915  has_null_per_chunk,
916  max_double_per_chunk,
917  min_double_per_chunk,
918  max_int64t_per_chunk,
919  min_int64t_per_chunk,
920  cd->columnType,
921  updel_roll);
922 }
923 
925  FragmentInfo& fragment,
926  std::shared_ptr<Chunk_NS::Chunk> chunk,
927  const bool has_null_per_chunk,
928  const double max_double_per_chunk,
929  const double min_double_per_chunk,
930  const int64_t max_int64t_per_chunk,
931  const int64_t min_int64t_per_chunk,
932  const SQLTypeInfo& rhs_type,
933  UpdelRoll& updel_roll) {
934  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
935  auto key = std::make_pair(td, &fragment);
936  std::lock_guard<std::mutex> lck(updel_roll.mutex);
937  if (0 == updel_roll.chunkMetadata.count(key)) {
938  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
939  }
940  if (0 == updel_roll.numTuples.count(key)) {
941  updel_roll.numTuples[key] = fragment.shadowNumTuples;
942  }
943  auto& chunkMetadata = updel_roll.chunkMetadata[key];
944 
945  auto buffer = chunk->getBuffer();
946  const auto& lhs_type = cd->columnType;
947 
948  auto encoder = buffer->encoder.get();
949  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
950  static_assert(std::is_same<decltype(min), decltype(max)>::value,
951  "Type mismatch on min/max");
952  if (has_null) {
953  encoder->updateStats(decltype(min)(), true);
954  }
955  if (max < min) {
956  return;
957  }
958  encoder->updateStats(min, false);
959  encoder->updateStats(max, false);
960  };
961 
962  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
963  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
964  } else if (lhs_type.is_fp()) {
965  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
966  } else if (lhs_type.is_decimal()) {
967  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
968  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
969  has_null_per_chunk);
970  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
971  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
972  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
973  }
974  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
975 
976  // removed as @alex suggests. keep it commented in case of any chance to revisit
977  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
978 }
979 
981  const MetaDataKey& key,
982  UpdelRoll& updel_roll) {
983  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
984  if (updel_roll.chunkMetadata.count(key)) {
985  auto& fragmentInfo = *key.second;
986  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
987  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
988  fragmentInfo.setChunkMetadataMap(chunkMetadata);
989  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
990  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
991  }
992 }
993 
995  const TableDescriptor* td,
996  const FragmentInfo& fragment,
997  const Data_Namespace::MemoryLevel memory_level) {
998  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
999  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1000  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1001  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1002  ++ncol;
1003  if (!cd->isVirtualCol) {
1004  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1005  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1006  ChunkKey chunk_key{
1007  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1008  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1009  &catalog_->getDataMgr(),
1010  chunk_key,
1011  memory_level,
1012  0,
1013  chunk_meta_it->second->numBytes,
1014  chunk_meta_it->second->numElements);
1015  chunks.push_back(chunk);
1016  }
1017  }
1018  }
1019  return chunks;
1020 }
1021 
1022 // get a sorted vector of offsets of rows to vacuum
1023 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1024  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1025  const auto data_buffer = chunk->getBuffer();
1026  const auto data_addr = data_buffer->getMemoryPtr();
1027  const size_t nrows_in_chunk = data_buffer->size();
1028  const size_t ncore = cpu_threads();
1029  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1030  std::vector<std::vector<uint64_t>> deleted_offsets;
1031  deleted_offsets.resize(ncore);
1032  std::vector<std::future<void>> threads;
1033  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1034  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1035  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1036  const auto ithread = rbegin / segsz;
1037  CHECK(ithread < deleted_offsets.size());
1038  deleted_offsets[ithread].reserve(segsz);
1039  for (size_t r = rbegin; r < rend; ++r) {
1040  if (data_addr[r]) {
1041  deleted_offsets[ithread].push_back(r);
1042  }
1043  }
1044  }));
1045  }
1046  wait_cleanup_threads(threads);
1047  std::vector<uint64_t> all_deleted_offsets;
1048  for (size_t i = 0; i < ncore; ++i) {
1049  all_deleted_offsets.insert(
1050  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1051  }
1052  return all_deleted_offsets;
1053 }
1054 
1055 template <typename T>
1056 static void set_chunk_stats(const SQLTypeInfo& col_type,
1057  int8_t* data_addr,
1058  int8_t& has_null,
1059  T& min,
1060  T& max) {
1061  T v;
1062  const auto can_be_null = !col_type.get_notnull();
1063  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1064  if (is_null) {
1065  has_null = has_null || (can_be_null && is_null);
1066  } else {
1067  set_minmax(min, max, v);
1068  }
1069 }
1070 
1072  FragmentInfo& fragment,
1073  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1074  const size_t nrows_to_keep,
1075  UpdelRoll& updel_roll) {
1076  auto cd = chunk->getColumnDesc();
1077  auto td = catalog->getMetadataForTable(cd->tableId);
1078  auto data_buffer = chunk->getBuffer();
1079  std::lock_guard<std::mutex> lck(updel_roll.mutex);
1080  const auto key = std::make_pair(td, &fragment);
1081  if (0 == updel_roll.chunkMetadata.count(key)) {
1082  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
1083  }
1084  auto& chunkMetadata = updel_roll.chunkMetadata[key];
1085  chunkMetadata[cd->columnId]->numElements = nrows_to_keep;
1086  chunkMetadata[cd->columnId]->numBytes = data_buffer->size();
1087  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
1088  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
1089  }
1090 }
1091 
1093  const FragmentInfo& fragment,
1094  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1095  const std::vector<uint64_t>& frag_offsets) {
1096  const auto cd = chunk->getColumnDesc();
1097  const auto& col_type = cd->columnType;
1098  auto data_buffer = chunk->getBuffer();
1099  auto data_addr = data_buffer->getMemoryPtr();
1100  auto element_size =
1101  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1102  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1103  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1104  size_t nbytes_fix_data_to_keep = 0;
1105  auto nrows_to_vacuum = frag_offsets.size();
1106  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1107  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1108  auto is_last_one = irow == nrows_to_vacuum;
1109  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1110  auto maddr_to_vacuum = data_addr;
1111  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1112  if (nrows_to_keep > 0) {
1113  auto nbytes_to_keep = nrows_to_keep * element_size;
1114  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1115  // move curr fixlen row block toward front
1116  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1117  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1118  nbytes_to_keep);
1119  }
1120  irow_of_blk_to_fill += nrows_to_keep;
1121  nbytes_fix_data_to_keep += nbytes_to_keep;
1122  }
1123  irow_of_blk_to_keep = irow_to_vacuum + 1;
1124  }
1125  return nbytes_fix_data_to_keep;
1126 }
1127 
1129  const FragmentInfo& fragment,
1130  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1131  const std::vector<uint64_t>& frag_offsets) {
1132  auto data_buffer = chunk->getBuffer();
1133  auto index_buffer = chunk->getIndexBuf();
1134  auto data_addr = data_buffer->getMemoryPtr();
1135  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1136  auto index_array = (StringOffsetT*)indices_addr;
1137  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1138  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1139  size_t nbytes_fix_data_to_keep = 0;
1140  size_t nbytes_var_data_to_keep = 0;
1141  auto nrows_to_vacuum = frag_offsets.size();
1142  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1143  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1144  auto is_last_one = irow == nrows_to_vacuum;
1145  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1146  auto maddr_to_vacuum = data_addr;
1147  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1148  if (nrows_to_keep > 0) {
1149  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1150  auto nbytes_to_keep =
1151  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1152  index_array[irow_of_blk_to_keep];
1153  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1154  // move curr varlen row block toward front
1155  memmove(data_addr + ibyte_var_data_to_keep,
1156  data_addr + index_array[irow_of_blk_to_keep],
1157  nbytes_to_keep);
1158 
1159  const auto index_base = index_array[irow_of_blk_to_keep];
1160  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1161  auto& index = index_array[irow_of_blk_to_keep + i];
1162  index = ibyte_var_data_to_keep + (index - index_base);
1163  }
1164  }
1165  nbytes_var_data_to_keep += nbytes_to_keep;
1166  maddr_to_vacuum = indices_addr;
1167 
1168  constexpr static auto index_element_size = sizeof(StringOffsetT);
1169  nbytes_to_keep = nrows_to_keep * index_element_size;
1170  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1171  // move curr fixlen row block toward front
1172  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1173  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1174  nbytes_to_keep);
1175  }
1176  irow_of_blk_to_fill += nrows_to_keep;
1177  nbytes_fix_data_to_keep += nbytes_to_keep;
1178  }
1179  irow_of_blk_to_keep = irow_to_vacuum + 1;
1180  }
1181  return nbytes_var_data_to_keep;
1182 }
1183 
1185  const TableDescriptor* td,
1186  const int fragment_id,
1187  const std::vector<uint64_t>& frag_offsets,
1188  const Data_Namespace::MemoryLevel memory_level,
1189  UpdelRoll& updel_roll) {
1190  auto fragment_ptr = getFragmentInfo(fragment_id);
1191  auto& fragment = *fragment_ptr;
1192  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1193  const auto ncol = chunks.size();
1194 
1195  std::vector<int8_t> has_null_per_thread(ncol, 0);
1196  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1197  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1198  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1199  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1200 
1201  // parallel delete columns
1202  std::vector<std::future<void>> threads;
1203  auto nrows_to_vacuum = frag_offsets.size();
1204  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1205  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1206 
1207  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1208  auto chunk = chunks[ci];
1209  const auto cd = chunk->getColumnDesc();
1210  const auto& col_type = cd->columnType;
1211  auto data_buffer = chunk->getBuffer();
1212  auto index_buffer = chunk->getIndexBuf();
1213  auto data_addr = data_buffer->getMemoryPtr();
1214  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1215  auto index_array = (StringOffsetT*)indices_addr;
1216  bool is_varlen = col_type.is_varlen_indeed();
1217 
1218  auto fixlen_vacuum = [=,
1219  &has_null_per_thread,
1220  &max_double_per_thread,
1221  &min_double_per_thread,
1222  &min_int64t_per_thread,
1223  &max_int64t_per_thread,
1224  &updel_roll,
1225  &frag_offsets,
1226  &fragment] {
1227  size_t nbytes_fix_data_to_keep;
1228  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1229 
1230  data_buffer->encoder->setNumElems(nrows_to_keep);
1231  data_buffer->setSize(nbytes_fix_data_to_keep);
1232  data_buffer->setUpdated();
1233 
1234  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1235 
1236  auto daddr = data_addr;
1237  auto element_size =
1238  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1239  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1240  if (col_type.is_fixlen_array()) {
1241  auto encoder =
1242  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1243  CHECK(encoder);
1244  encoder->updateMetadata((int8_t*)daddr);
1245  } else if (col_type.is_fp()) {
1246  set_chunk_stats(col_type,
1247  data_addr,
1248  has_null_per_thread[ci],
1249  min_double_per_thread[ci],
1250  max_double_per_thread[ci]);
1251  } else {
1252  set_chunk_stats(col_type,
1253  data_addr,
1254  has_null_per_thread[ci],
1255  min_int64t_per_thread[ci],
1256  max_int64t_per_thread[ci]);
1257  }
1258  }
1259  };
1260 
1261  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1262  size_t nbytes_var_data_to_keep;
1263  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1264 
1265  data_buffer->encoder->setNumElems(nrows_to_keep);
1266  data_buffer->setSize(nbytes_var_data_to_keep);
1267  data_buffer->setUpdated();
1268 
1269  index_array[nrows_to_keep] = data_buffer->size();
1270  index_buffer->setSize(sizeof(*index_array) *
1271  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1272  index_buffer->setUpdated();
1273 
1274  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1275  };
1276 
1277  if (is_varlen) {
1278  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1279  } else {
1280  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1281  }
1282  if (threads.size() >= (size_t)cpu_threads()) {
1283  wait_cleanup_threads(threads);
1284  }
1285  }
1286 
1287  wait_cleanup_threads(threads);
1288 
1289  auto key = std::make_pair(td, &fragment);
1290  updel_roll.numTuples[key] = nrows_to_keep;
1291  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1292  auto chunk = chunks[ci];
1293  auto cd = chunk->getColumnDesc();
1294  if (!cd->columnType.is_fixlen_array()) {
1296  fragment,
1297  chunk,
1298  has_null_per_thread[ci],
1299  max_double_per_thread[ci],
1300  min_double_per_thread[ci],
1301  max_int64t_per_thread[ci],
1302  min_int64t_per_thread[ci],
1303  cd->columnType,
1304  updel_roll);
1305  }
1306  }
1307 }
1308 
1309 } // namespace Fragmenter_Namespace
1310 
1312  if (nullptr == catalog) {
1313  return;
1314  }
1315  const auto td = catalog->getMetadataForTable(logicalTableId);
1316  CHECK(td);
1317  // checkpoint all shards regardless, or epoch becomes out of sync
1318  if (td->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
1319  catalog->checkpoint(logicalTableId);
1320  }
1321  // for each dirty fragment
1322  for (auto& cm : chunkMetadata) {
1323  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1324  }
1325  dirtyChunks.clear();
1326  // flush gpu dirty chunks if update was not on gpu
1327  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1328  for (const auto& chunkey : dirtyChunkeys) {
1329  catalog->getDataMgr().deleteChunksWithPrefix(
1331  }
1332  }
1333 }
1334 
1336  if (nullptr == catalog) {
1337  return;
1338  }
1339 
1340  if (is_varlen_update) {
1341  int databaseId = catalog->getCurrentDB().dbId;
1342  int32_t tableEpoch = catalog->getTableEpoch(databaseId, logicalTableId);
1343 
1344  dirtyChunks.clear();
1345  const_cast<Catalog_Namespace::Catalog*>(catalog)->setTableEpoch(
1346  databaseId, logicalTableId, tableEpoch);
1347  } else {
1348  const auto td = catalog->getMetadataForTable(logicalTableId);
1349  CHECK(td);
1350  if (td->persistenceLevel != memoryLevel) {
1351  for (auto dit : dirtyChunks) {
1352  catalog->getDataMgr().free(dit.first->getBuffer());
1353  dit.first->setBuffer(nullptr);
1354  }
1355  }
1356  }
1357 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:67
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
bool is_time() const
Definition: sqltypes.h:422
bool is_string() const
Definition: sqltypes.h:416
bool is_boolean() const
Definition: sqltypes.h:423
void commitUpdate()
const int8_t const int64_t * num_rows
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:150
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:151
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
const ColumnDescriptor * column_descriptor_
bool is_integer() const
Definition: sqltypes.h:418
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:40
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:196
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor *> columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
void cancelUpdate()
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
AbstractBuffer * getBuffer() const
Definition: Chunk.h:104
void insertData(InsertData &insertDataStruct) override
appends data onto the most recently occuring fragment, creating a new one if necessary ...
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:268
#define UNREACHABLE()
Definition: Logger.h:241
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:100
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:819
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:106
bool is_timeinterval() const
Definition: sqltypes.h:427
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
bool is_decimal() const
Definition: sqltypes.h:419
HOST DEVICE int get_scale() const
Definition: sqltypes.h:264
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:867
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:131
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:266
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3597
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual size_t const getEntryCount() const =0
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:195
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1449
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE > > ColumnDataPtr
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
bool is_null(const T &v, const SQLTypeInfo &t)
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
int32_t ArrayOffsetT
Definition: sqltypes.h:868
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:261
bool g_enable_experimental_string_functions
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:53
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t > > ColumnDataPtr
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:35
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
SQLTypeInfo columnType
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
specifies the content in-memory of a row in the table metadata table
int8_t * numbersPtr
Definition: sqltypes.h:149
void set_minmax(T &min, T &max, T const val)
int cpu_threads()
Definition: thread_count.h:24
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
bool is_fp() const
Definition: sqltypes.h:420
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)