OmniSciDB  ab4938a6a3
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <boost/variant.hpp>
18 #include <boost/variant/get.hpp>
19 #include <limits>
20 #include <mutex>
21 #include <string>
22 #include <vector>
23 
24 #include "Catalog/Catalog.h"
25 #include "DataMgr/DataMgr.h"
28 #include "QueryEngine/Execute.h"
30 #include "Shared/DateConverters.h"
31 #include "Shared/Logger.h"
33 #include "Shared/thread_count.h"
35 
37 
38 namespace Fragmenter_Namespace {
39 
40 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
41  for (auto& t : threads) {
42  t.get();
43  }
44  threads.clear();
45 }
46 
47 inline bool is_integral(const SQLTypeInfo& t) {
48  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
49 }
50 
52 
54  const std::string& tab_name,
55  const std::string& col_name,
56  const int fragment_id,
57  const std::vector<uint64_t>& frag_offsets,
58  const std::vector<ScalarTargetValue>& rhs_values,
59  const SQLTypeInfo& rhs_type,
60  const Data_Namespace::MemoryLevel memory_level,
61  UpdelRoll& updel_roll) {
62  const auto td = catalog->getMetadataForTable(tab_name);
63  CHECK(td);
64  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
65  CHECK(cd);
66  td->fragmenter->updateColumn(catalog,
67  td,
68  cd,
69  fragment_id,
70  frag_offsets,
71  rhs_values,
72  rhs_type,
73  memory_level,
74  updel_roll);
75 }
76 
78  const TableDescriptor* td,
79  const ColumnDescriptor* cd,
80  const int fragment_id,
81  const std::vector<uint64_t>& frag_offsets,
82  const ScalarTargetValue& rhs_value,
83  const SQLTypeInfo& rhs_type,
84  const Data_Namespace::MemoryLevel memory_level,
85  UpdelRoll& updel_roll) {
86  updateColumn(catalog,
87  td,
88  cd,
89  fragment_id,
90  frag_offsets,
91  std::vector<ScalarTargetValue>(1, rhs_value),
92  rhs_type,
93  memory_level,
94  updel_roll);
95 }
96 
97 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
98  const TableDescriptor* td,
99  const FragmentInfo& fragment,
100  const Data_Namespace::MemoryLevel memory_level,
101  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
102  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
103  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
104  ++nc;
105  if (!cd->isVirtualCol) {
106  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
107  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
108  ChunkKey chunk_key{
109  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
110  auto chunk = Chunk_NS::Chunk::getChunk(cd,
111  &catalog->getDataMgr(),
112  chunk_key,
113  memory_level,
114  0,
115  chunk_meta_it->second->numBytes,
116  chunk_meta_it->second->numElements);
117  chunks.push_back(chunk);
118  }
119  }
120  }
121  return chunks.size();
122 }
123 
125  public:
127 
128  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
129 
130  virtual void addDataBlocksToInsertData(
132 };
133 
134 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
136  using ColumnDataPtr =
137  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
138 
142  const BUFFER_DATA_TYPE* data_buffer_addr_;
143 
144  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
145  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
146  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
147  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
148  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
149  }
150 
151  ~ScalarChunkConverter() override {}
152 
153  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
154  auto buffer_value = data_buffer_addr_[indexInFragment];
155  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
156  column_data_.get()[row] = insert_value;
157  }
158 
160  DataBlockPtr dataBlock;
161  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
162  insertData.data.push_back(dataBlock);
163  insertData.columnIds.push_back(column_descriptor_->columnId);
164  }
165 };
166 
170 
171  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
174 
176  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
177  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
178  data_buffer_addr_ = chunk->getBuffer()->getMemoryPtr();
179  fixed_array_length_ = chunk->getColumnDesc()->columnType.get_size();
180  }
181 
183 
184  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
185  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
186 
187  bool is_null = FixedLengthArrayNoneEncoder::is_null(column_descriptor_->columnType,
188  src_value_ptr);
189 
190  (*column_data_)[row] = ArrayDatum(
191  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
192  }
193 
195  DataBlockPtr dataBlock;
196  dataBlock.arraysPtr = column_data_.get();
197  insertData.data.push_back(dataBlock);
198  insertData.columnIds.push_back(column_descriptor_->columnId);
199  }
200 };
201 
204 
205  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
206  : FixedLenArrayChunkConverter(num_rows, chunk) {
207  index_buffer_addr_ =
208  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
209  : nullptr);
210  }
211 
212  ~ArrayChunkConverter() override {}
213 
214  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
215  auto startIndex = index_buffer_addr_[indexInFragment];
216  auto endIndex = index_buffer_addr_[indexInFragment + 1];
217  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
218  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
219  (*column_data_)[row] = ArrayDatum(
220  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
221  }
222 };
223 
227 
228  std::unique_ptr<std::vector<std::string>> column_data_;
229  const int8_t* data_buffer_addr_;
231 
233  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
234  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
235  data_buffer_addr_ = chunk->getBuffer()->getMemoryPtr();
236  index_buffer_addr_ =
237  (StringOffsetT*)(chunk->getIndexBuf() ? chunk->getIndexBuf()->getMemoryPtr()
238  : nullptr);
239  }
240 
241  ~StringChunkConverter() override {}
242 
243  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
244  size_t src_value_size =
245  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
246  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
247  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
248  }
249 
251  DataBlockPtr dataBlock;
252  dataBlock.stringsPtr = column_data_.get();
253  insertData.data.push_back(dataBlock);
254  insertData.columnIds.push_back(column_descriptor_->columnId);
255  }
256 };
257 
258 template <typename BUFFER_DATA_TYPE>
260  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
261 
265  const BUFFER_DATA_TYPE* data_buffer_addr_;
266 
267  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
268  : chunk_(chunk), column_descriptor_(chunk->getColumnDesc()) {
269  column_data_ = ColumnDataPtr(
270  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
271  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->getBuffer()->getMemoryPtr();
272  }
273 
274  ~DateChunkConverter() override {}
275 
276  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
277  auto buffer_value = data_buffer_addr_[indexInFragment];
278  auto insert_value = static_cast<int64_t>(buffer_value);
279  column_data_.get()[row] = DateConverters::get_epoch_seconds_from_days(insert_value);
280  }
281 
283  DataBlockPtr dataBlock;
284  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
285  insertData.data.push_back(dataBlock);
286  insertData.columnIds.push_back(column_descriptor_->columnId);
287  }
288 };
289 
291  const Catalog_Namespace::Catalog* catalog,
292  const TableDescriptor* td,
293  const int fragmentId,
294  const std::vector<TargetMetaInfo> sourceMetaInfo,
295  const std::vector<const ColumnDescriptor*> columnDescriptors,
296  const RowDataProvider& sourceDataProvider,
297  const size_t indexOffFragmentOffsetColumn,
298  const Data_Namespace::MemoryLevel memoryLevel,
299  UpdelRoll& updelRoll) {
300  updelRoll.is_varlen_update = true;
301  updelRoll.catalog = catalog;
302  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
303  updelRoll.memoryLevel = memoryLevel;
304 
305  size_t num_entries = sourceDataProvider.getEntryCount();
306  size_t num_rows = sourceDataProvider.getRowCount();
307 
308  if (0 == num_rows) {
309  // bail out early
310  return;
311  }
312 
314 
315  auto fragment_ptr = getFragmentInfo(fragmentId);
316  auto& fragment = *fragment_ptr;
317  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
318  get_chunks(catalog, td, fragment, memoryLevel, chunks);
319  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
320  columnDescriptors.size());
321  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
322  std::shared_ptr<Executor> executor;
323 
325  executor = Executor::getExecutor(catalog->getCurrentDB().dbId);
326  }
327 
328  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
329  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
330  auto chunk = chunks[indexOfChunk];
331  const auto chunk_cd = chunk->getColumnDesc();
332 
333  if (chunk_cd->isDeletedCol) {
334  deletedChunk = chunk;
335  continue;
336  }
337 
338  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
339  columnDescriptors.end(),
340  [=](const ColumnDescriptor* cd) -> bool {
341  return cd->columnId == chunk_cd->columnId;
342  });
343 
344  if (targetColumnIt != columnDescriptors.end()) {
345  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
346 
347  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
348  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
349 
351  num_rows,
352  *catalog,
353  sourceDataMetaInfo,
354  targetDescriptor,
355  targetDescriptor->columnType,
356  !targetDescriptor->columnType.get_notnull(),
357  sourceDataProvider.getLiteralDictionary(),
359  ? executor->getStringDictionaryProxy(
360  sourceDataMetaInfo.get_type_info().get_comp_param(),
361  executor->getRowSetMemoryOwner(),
362  true)
363  : nullptr};
364  auto converter = factory.create(param);
365  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
366 
367  if (targetDescriptor->columnType.is_geometry()) {
368  // geometry columns are composites
369  // need to skip chunks, depending on geo type
370  switch (targetDescriptor->columnType.get_type()) {
371  case kMULTIPOLYGON:
372  indexOfChunk += 5;
373  break;
374  case kPOLYGON:
375  indexOfChunk += 4;
376  break;
377  case kLINESTRING:
378  indexOfChunk += 2;
379  break;
380  case kPOINT:
381  indexOfChunk += 1;
382  break;
383  default:
384  CHECK(false); // not supported
385  }
386  }
387  } else {
388  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
389  std::unique_ptr<ChunkToInsertDataConverter> converter;
390 
391  if (chunk_cd->columnType.is_fixlen_array()) {
392  converter =
393  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
394  } else if (chunk_cd->columnType.is_string()) {
395  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
396  } else if (chunk_cd->columnType.is_geometry()) {
397  // the logical geo column is a string column
398  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
399  } else {
400  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
401  }
402 
403  chunkConverters.push_back(std::move(converter));
404 
405  } else if (chunk_cd->columnType.is_date_in_days()) {
406  /* Q: Why do we need this?
407  A: In variable length updates path we move the chunk content of column
408  without decoding. Since it again passes through DateDaysEncoder
409  the expected value should be in seconds, but here it will be in days.
410  Therefore, using DateChunkConverter chunk values are being scaled to
411  seconds which then ultimately encoded in days in DateDaysEncoder.
412  */
413  std::unique_ptr<ChunkToInsertDataConverter> converter;
414  const size_t physical_size = chunk_cd->columnType.get_size();
415  if (physical_size == 2) {
416  converter =
417  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
418  } else if (physical_size == 4) {
419  converter =
420  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
421  } else {
422  CHECK(false);
423  }
424  chunkConverters.push_back(std::move(converter));
425  } else {
426  std::unique_ptr<ChunkToInsertDataConverter> converter;
427  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
428  int logical_size = logical_type.get_size();
429  int physical_size = chunk_cd->columnType.get_size();
430 
431  if (logical_type.is_string()) {
432  // for dicts -> logical = physical
433  logical_size = physical_size;
434  }
435 
436  if (8 == physical_size) {
437  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
438  num_rows, chunk.get());
439  } else if (4 == physical_size) {
440  if (8 == logical_size) {
441  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
442  num_rows, chunk.get());
443  } else {
444  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
445  num_rows, chunk.get());
446  }
447  } else if (2 == chunk_cd->columnType.get_size()) {
448  if (8 == logical_size) {
449  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
450  num_rows, chunk.get());
451  } else if (4 == logical_size) {
452  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
453  num_rows, chunk.get());
454  } else {
455  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
456  num_rows, chunk.get());
457  }
458  } else if (1 == chunk_cd->columnType.get_size()) {
459  if (8 == logical_size) {
460  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
461  num_rows, chunk.get());
462  } else if (4 == logical_size) {
463  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
464  num_rows, chunk.get());
465  } else if (2 == logical_size) {
466  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
467  num_rows, chunk.get());
468  } else {
469  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
470  num_rows, chunk.get());
471  }
472  } else {
473  CHECK(false); // unknown
474  }
475 
476  chunkConverters.push_back(std::move(converter));
477  }
478  }
479  }
480 
481  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
482  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
483 
484  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
485  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
486  deletedChunk->getColumnDesc()->tableId,
487  deletedChunk->getColumnDesc()->columnId,
488  fragment.fragmentId};
489  updelRoll.dirtyChunkeys.insert(chunkey);
490  bool* deletedChunkBuffer =
491  reinterpret_cast<bool*>(deletedChunk->getBuffer()->getMemoryPtr());
492 
493  std::atomic<size_t> row_idx{0};
494 
495  auto row_converter = [&sourceDataProvider,
496  &sourceDataConverters,
497  &indexOffFragmentOffsetColumn,
498  &chunkConverters,
499  &deletedChunkBuffer,
500  &row_idx](size_t indexOfEntry) -> void {
501  // convert the source data
502  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
503  if (row.empty()) {
504  return;
505  }
506 
507  size_t indexOfRow = row_idx.fetch_add(1);
508 
509  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
510  if (sourceDataConverters[col]) {
511  const auto& mapd_variant = row[col];
512  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
513  }
514  }
515 
516  auto scalar = checked_get(
517  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
518  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
519 
520  // convert the remaining chunks
521  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
522  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
523  }
524 
525  // now mark the row as deleted
526  deletedChunkBuffer[indexInChunkBuffer] = true;
527  };
528 
529  bool can_go_parallel = num_rows > 20000;
530 
531  if (can_go_parallel) {
532  const size_t num_worker_threads = cpu_threads();
533  std::vector<std::future<void>> worker_threads;
534  for (size_t i = 0,
535  start_entry = 0,
536  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
537  i < num_worker_threads && start_entry < num_entries;
538  ++i, start_entry += stride) {
539  const auto end_entry = std::min(start_entry + stride, num_rows);
540  worker_threads.push_back(std::async(
541  std::launch::async,
542  [&row_converter](const size_t start, const size_t end) {
543  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
544  row_converter(indexOfRow);
545  }
546  },
547  start_entry,
548  end_entry));
549  }
550 
551  for (auto& child : worker_threads) {
552  child.wait();
553  }
554 
555  } else {
556  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
557  row_converter(entryIdx);
558  }
559  }
560 
562  insert_data.databaseId = catalog->getCurrentDB().dbId;
563  insert_data.tableId = td->tableId;
564 
565  for (size_t i = 0; i < chunkConverters.size(); i++) {
566  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
567  continue;
568  }
569 
570  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
571  if (sourceDataConverters[i]) {
572  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
573  }
574  continue;
575  }
576 
577  insert_data.numRows = num_rows;
578  insertDataNoCheckpoint(insert_data);
579 
580  // update metdata
581  if (!deletedChunk->getBuffer()->has_encoder) {
582  deletedChunk->initEncoder();
583  }
584  deletedChunk->getBuffer()->encoder->updateStats(static_cast<int64_t>(true), false);
585 
586  auto& shadowDeletedChunkMeta =
587  fragment.shadowChunkMetadataMap[deletedChunk->getColumnDesc()->columnId];
588  if (shadowDeletedChunkMeta->numElements >
589  deletedChunk->getBuffer()->encoder->getNumElems()) {
590  // the append will have populated shadow meta data, otherwise use existing num
591  // elements
592  deletedChunk->getBuffer()->encoder->setNumElems(shadowDeletedChunkMeta->numElements);
593  }
594  deletedChunk->getBuffer()->setUpdated();
595 }
596 
598  const TableDescriptor* td,
599  const ColumnDescriptor* cd,
600  const int fragment_id,
601  const std::vector<uint64_t>& frag_offsets,
602  const std::vector<ScalarTargetValue>& rhs_values,
603  const SQLTypeInfo& rhs_type,
604  const Data_Namespace::MemoryLevel memory_level,
605  UpdelRoll& updel_roll) {
606  updel_roll.catalog = catalog;
607  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
608  updel_roll.memoryLevel = memory_level;
609 
610  const size_t ncore = cpu_threads();
611  const auto nrow = frag_offsets.size();
612  const auto n_rhs_values = rhs_values.size();
613  if (0 == nrow) {
614  return;
615  }
616  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
617 
618  auto fragment_ptr = getFragmentInfo(fragment_id);
619  auto& fragment = *fragment_ptr;
620  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
621  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
622  ChunkKey chunk_key{
623  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
624  auto chunk = Chunk_NS::Chunk::getChunk(cd,
625  &catalog->getDataMgr(),
626  chunk_key,
628  0,
629  chunk_meta_it->second->numBytes,
630  chunk_meta_it->second->numElements);
631 
632  std::vector<int8_t> has_null_per_thread(ncore, 0);
633  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
634  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
635  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
636  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
637 
638  // parallel update elements
639  std::vector<std::future<void>> threads;
640 
641  const auto segsz = (nrow + ncore - 1) / ncore;
642  auto dbuf = chunk->getBuffer();
643  auto dbuf_addr = dbuf->getMemoryPtr();
644  dbuf->setUpdated();
645  {
646  std::lock_guard<std::mutex> lck(updel_roll.mutex);
647  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
648  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
649  }
650 
651  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
652  cd->tableId,
653  cd->columnId,
654  fragment.fragmentId};
655  updel_roll.dirtyChunkeys.insert(chunkey);
656  }
657  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
658  threads.emplace_back(std::async(
659  std::launch::async,
660  [=,
661  &has_null_per_thread,
662  &min_int64t_per_thread,
663  &max_int64t_per_thread,
664  &min_double_per_thread,
665  &max_double_per_thread,
666  &frag_offsets,
667  &rhs_values] {
668  SQLTypeInfo lhs_type = cd->columnType;
669 
670  // !! not sure if this is a undocumented convention or a bug, but for a sharded
671  // table the dictionary id of a encoded string column is not specified by
672  // comp_param in physical table but somehow in logical table :) comp_param in
673  // physical table is always 0, so need to adapt accordingly...
674  auto cdl = (shard_ < 0)
675  ? cd
676  : catalog->getMetadataForColumn(
677  catalog->getLogicalTableId(td->tableId), cd->columnId);
678  CHECK(cdl);
679  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
680  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
681  lhs_type, &decimalOverflowValidator);
682  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
683  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
684  lhs_type, &dateDaysOverflowValidator);
685 
686  StringDictionary* stringDict{nullptr};
687  if (lhs_type.is_string()) {
688  CHECK(kENCODING_DICT == lhs_type.get_compression());
689  auto dictDesc = const_cast<DictDescriptor*>(
690  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
691  CHECK(dictDesc);
692  stringDict = dictDesc->stringDict.get();
693  CHECK(stringDict);
694  }
695 
696  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
697  const auto roffs = frag_offsets[r];
698  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
699  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
700  ScalarTargetValue sv2;
701 
702  // Subtle here is on the two cases of string-to-string assignments, when
703  // upstream passes RHS string as a string index instead of a preferred "real
704  // string".
705  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
706  // index
707  // in this layer, so if upstream passes a str idx here, an
708  // exception is thrown.
709  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
710  // str idx.
711  if (rhs_type.is_string()) {
712  if (const auto vp = boost::get<int64_t>(sv)) {
713  auto dictDesc = const_cast<DictDescriptor*>(
714  catalog->getMetadataForDict(rhs_type.get_comp_param()));
715  if (nullptr == dictDesc) {
716  throw std::runtime_error(
717  "UPDATE does not support cast from string literal to string "
718  "column.");
719  }
720  auto stringDict = dictDesc->stringDict.get();
721  CHECK(stringDict);
722  sv2 = NullableString(stringDict->getString(*vp));
723  sv = &sv2;
724  }
725  }
726 
727  if (const auto vp = boost::get<int64_t>(sv)) {
728  auto v = *vp;
729  if (lhs_type.is_string()) {
730  throw std::runtime_error("UPDATE does not support cast to string.");
731  }
732  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
733  if (lhs_type.is_decimal()) {
734  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
735  int64_t decimal_val;
736  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
737  tabulate_metadata(lhs_type,
738  min_int64t_per_thread[c],
739  max_int64t_per_thread[c],
740  has_null_per_thread[c],
741  (v == inline_int_null_value<int64_t>() &&
742  lhs_type.get_notnull() == false)
743  ? v
744  : decimal_val);
745  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
746  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
747  if (positive_v_and_negative_d || negative_v_and_positive_d) {
748  throw std::runtime_error(
749  "Data conversion overflow on " + std::to_string(v) +
750  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
751  std::to_string(rhs_type.get_scale()) + ") to (" +
752  std::to_string(lhs_type.get_dimension()) + ", " +
753  std::to_string(lhs_type.get_scale()) + ")");
754  }
755  } else if (is_integral(lhs_type)) {
756  if (lhs_type.is_date_in_days()) {
757  // Store meta values in seconds
758  if (lhs_type.get_size() == 2) {
759  nullAwareDateOverflowValidator.validate<int16_t>(v);
760  } else {
761  nullAwareDateOverflowValidator.validate<int32_t>(v);
762  }
763  int64_t days;
764  get_scalar<int64_t>(data_ptr, lhs_type, days);
765  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
766  tabulate_metadata(lhs_type,
767  min_int64t_per_thread[c],
768  max_int64t_per_thread[c],
769  has_null_per_thread[c],
770  (v == inline_int_null_value<int64_t>() &&
771  lhs_type.get_notnull() == false)
772  ? NullSentinelSupplier()(lhs_type, v)
773  : seconds);
774  } else {
775  int64_t target_value;
776  if (rhs_type.is_decimal()) {
777  target_value = round(decimal_to_double(rhs_type, v));
778  } else {
779  target_value = v;
780  }
781  tabulate_metadata(lhs_type,
782  min_int64t_per_thread[c],
783  max_int64t_per_thread[c],
784  has_null_per_thread[c],
785  target_value);
786  }
787  } else {
789  lhs_type,
790  min_double_per_thread[c],
791  max_double_per_thread[c],
792  has_null_per_thread[c],
793  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
794  }
795  } else if (const auto vp = boost::get<double>(sv)) {
796  auto v = *vp;
797  if (lhs_type.is_string()) {
798  throw std::runtime_error("UPDATE does not support cast to string.");
799  }
800  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
801  if (lhs_type.is_integer()) {
802  tabulate_metadata(lhs_type,
803  min_int64t_per_thread[c],
804  max_int64t_per_thread[c],
805  has_null_per_thread[c],
806  int64_t(v));
807  } else if (lhs_type.is_fp()) {
808  tabulate_metadata(lhs_type,
809  min_double_per_thread[c],
810  max_double_per_thread[c],
811  has_null_per_thread[c],
812  double(v));
813  } else {
814  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
815  "LHS with a floating RHS.";
816  }
817  } else if (const auto vp = boost::get<float>(sv)) {
818  auto v = *vp;
819  if (lhs_type.is_string()) {
820  throw std::runtime_error("UPDATE does not support cast to string.");
821  }
822  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
823  if (lhs_type.is_integer()) {
824  tabulate_metadata(lhs_type,
825  min_int64t_per_thread[c],
826  max_int64t_per_thread[c],
827  has_null_per_thread[c],
828  int64_t(v));
829  } else {
830  tabulate_metadata(lhs_type,
831  min_double_per_thread[c],
832  max_double_per_thread[c],
833  has_null_per_thread[c],
834  double(v));
835  }
836  } else if (const auto vp = boost::get<NullableString>(sv)) {
837  const auto s = boost::get<std::string>(vp);
838  const auto sval = s ? *s : std::string("");
839  if (lhs_type.is_string()) {
840  decltype(stringDict->getOrAdd(sval)) sidx;
841  {
842  std::unique_lock<std::mutex> lock(temp_mutex_);
843  sidx = stringDict->getOrAdd(sval);
844  }
845  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
846  tabulate_metadata(lhs_type,
847  min_int64t_per_thread[c],
848  max_int64t_per_thread[c],
849  has_null_per_thread[c],
850  int64_t(sidx));
851  } else if (sval.size() > 0) {
852  auto dval = std::atof(sval.data());
853  if (lhs_type.is_boolean()) {
854  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
855  } else if (lhs_type.is_time()) {
856  throw std::runtime_error(
857  "Date/Time/Timestamp update not supported through translated "
858  "string path.");
859  }
860  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
861  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
862  tabulate_metadata(lhs_type,
863  min_double_per_thread[c],
864  max_double_per_thread[c],
865  has_null_per_thread[c],
866  double(dval));
867  } else {
868  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
869  tabulate_metadata(lhs_type,
870  min_int64t_per_thread[c],
871  max_int64t_per_thread[c],
872  has_null_per_thread[c],
873  int64_t(dval));
874  }
875  } else {
876  put_null(data_ptr, lhs_type, cd->columnName);
877  has_null_per_thread[c] = true;
878  }
879  } else {
880  CHECK(false);
881  }
882  }
883  }));
884  if (threads.size() >= (size_t)cpu_threads()) {
885  wait_cleanup_threads(threads);
886  }
887  }
888  wait_cleanup_threads(threads);
889 
890  // for unit test
892  if (cd->isDeletedCol) {
893  const auto deleted_offsets = getVacuumOffsets(chunk);
894  if (deleted_offsets.size() > 0) {
895  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
896  return;
897  }
898  }
899  }
900  bool has_null_per_chunk{false};
901  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
902  double min_double_per_chunk{std::numeric_limits<double>::max()};
903  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
904  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
905  for (size_t c = 0; c < ncore; ++c) {
906  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
907  max_double_per_chunk =
908  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
909  min_double_per_chunk =
910  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
911  max_int64t_per_chunk =
912  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
913  min_int64t_per_chunk =
914  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
915  }
917  fragment,
918  chunk,
919  has_null_per_chunk,
920  max_double_per_chunk,
921  min_double_per_chunk,
922  max_int64t_per_chunk,
923  min_int64t_per_chunk,
924  cd->columnType,
925  updel_roll);
926 }
927 
929  FragmentInfo& fragment,
930  std::shared_ptr<Chunk_NS::Chunk> chunk,
931  const bool has_null_per_chunk,
932  const double max_double_per_chunk,
933  const double min_double_per_chunk,
934  const int64_t max_int64t_per_chunk,
935  const int64_t min_int64t_per_chunk,
936  const SQLTypeInfo& rhs_type,
937  UpdelRoll& updel_roll) {
938  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
939  auto key = std::make_pair(td, &fragment);
940  std::lock_guard<std::mutex> lck(updel_roll.mutex);
941  if (0 == updel_roll.chunkMetadata.count(key)) {
942  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
943  }
944  if (0 == updel_roll.numTuples.count(key)) {
945  updel_roll.numTuples[key] = fragment.shadowNumTuples;
946  }
947  auto& chunkMetadata = updel_roll.chunkMetadata[key];
948 
949  auto buffer = chunk->getBuffer();
950  const auto& lhs_type = cd->columnType;
951 
952  auto encoder = buffer->encoder.get();
953  auto update_stats = [&encoder](auto min, auto max, auto has_null) {
954  static_assert(std::is_same<decltype(min), decltype(max)>::value,
955  "Type mismatch on min/max");
956  if (has_null) {
957  encoder->updateStats(decltype(min)(), true);
958  }
959  if (max < min) {
960  return;
961  }
962  encoder->updateStats(min, false);
963  encoder->updateStats(max, false);
964  };
965 
966  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
967  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
968  } else if (lhs_type.is_fp()) {
969  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
970  } else if (lhs_type.is_decimal()) {
971  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
972  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
973  has_null_per_chunk);
974  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
975  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
976  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
977  }
978  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
979 
980  // removed as @alex suggests. keep it commented in case of any chance to revisit
981  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
982 }
983 
985  const MetaDataKey& key,
986  UpdelRoll& updel_roll) {
987  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
988  if (updel_roll.chunkMetadata.count(key)) {
989  auto& fragmentInfo = *key.second;
990  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
991  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
992  fragmentInfo.setChunkMetadataMap(chunkMetadata);
993  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
994  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
995  }
996 }
997 
999  const TableDescriptor* td,
1000  const FragmentInfo& fragment,
1001  const Data_Namespace::MemoryLevel memory_level) {
1002  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
1003  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1004  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1005  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1006  ++ncol;
1007  if (!cd->isVirtualCol) {
1008  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1009  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1010  ChunkKey chunk_key{
1011  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1012  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1013  &catalog_->getDataMgr(),
1014  chunk_key,
1015  memory_level,
1016  0,
1017  chunk_meta_it->second->numBytes,
1018  chunk_meta_it->second->numElements);
1019  chunks.push_back(chunk);
1020  }
1021  }
1022  }
1023  return chunks;
1024 }
1025 
1026 // get a sorted vector of offsets of rows to vacuum
1027 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1028  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1029  const auto data_buffer = chunk->getBuffer();
1030  const auto data_addr = data_buffer->getMemoryPtr();
1031  const size_t nrows_in_chunk = data_buffer->size();
1032  const size_t ncore = cpu_threads();
1033  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1034  std::vector<std::vector<uint64_t>> deleted_offsets;
1035  deleted_offsets.resize(ncore);
1036  std::vector<std::future<void>> threads;
1037  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1038  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1039  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1040  const auto ithread = rbegin / segsz;
1041  CHECK(ithread < deleted_offsets.size());
1042  deleted_offsets[ithread].reserve(segsz);
1043  for (size_t r = rbegin; r < rend; ++r) {
1044  if (data_addr[r]) {
1045  deleted_offsets[ithread].push_back(r);
1046  }
1047  }
1048  }));
1049  }
1050  wait_cleanup_threads(threads);
1051  std::vector<uint64_t> all_deleted_offsets;
1052  for (size_t i = 0; i < ncore; ++i) {
1053  all_deleted_offsets.insert(
1054  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1055  }
1056  return all_deleted_offsets;
1057 }
1058 
1059 template <typename T>
1060 static void set_chunk_stats(const SQLTypeInfo& col_type,
1061  int8_t* data_addr,
1062  int8_t& has_null,
1063  T& min,
1064  T& max) {
1065  T v;
1066  const auto can_be_null = !col_type.get_notnull();
1067  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1068  if (is_null) {
1069  has_null = has_null || (can_be_null && is_null);
1070  } else {
1071  set_minmax(min, max, v);
1072  }
1073 }
1074 
1076  FragmentInfo& fragment,
1077  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1078  const size_t nrows_to_keep,
1079  UpdelRoll& updel_roll) {
1080  auto cd = chunk->getColumnDesc();
1081  auto td = catalog->getMetadataForTable(cd->tableId);
1082  auto data_buffer = chunk->getBuffer();
1083  std::lock_guard<std::mutex> lck(updel_roll.mutex);
1084  const auto key = std::make_pair(td, &fragment);
1085  if (0 == updel_roll.chunkMetadata.count(key)) {
1086  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
1087  }
1088  auto& chunkMetadata = updel_roll.chunkMetadata[key];
1089  chunkMetadata[cd->columnId]->numElements = nrows_to_keep;
1090  chunkMetadata[cd->columnId]->numBytes = data_buffer->size();
1091  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
1092  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
1093  }
1094 }
1095 
1097  const FragmentInfo& fragment,
1098  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1099  const std::vector<uint64_t>& frag_offsets) {
1100  const auto cd = chunk->getColumnDesc();
1101  const auto& col_type = cd->columnType;
1102  auto data_buffer = chunk->getBuffer();
1103  auto data_addr = data_buffer->getMemoryPtr();
1104  auto element_size =
1105  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1106  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1107  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1108  size_t nbytes_fix_data_to_keep = 0;
1109  auto nrows_to_vacuum = frag_offsets.size();
1110  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1111  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1112  auto is_last_one = irow == nrows_to_vacuum;
1113  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1114  auto maddr_to_vacuum = data_addr;
1115  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1116  if (nrows_to_keep > 0) {
1117  auto nbytes_to_keep = nrows_to_keep * element_size;
1118  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1119  // move curr fixlen row block toward front
1120  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1121  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1122  nbytes_to_keep);
1123  }
1124  irow_of_blk_to_fill += nrows_to_keep;
1125  nbytes_fix_data_to_keep += nbytes_to_keep;
1126  }
1127  irow_of_blk_to_keep = irow_to_vacuum + 1;
1128  }
1129  return nbytes_fix_data_to_keep;
1130 }
1131 
1133  const FragmentInfo& fragment,
1134  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1135  const std::vector<uint64_t>& frag_offsets) {
1136  auto data_buffer = chunk->getBuffer();
1137  auto index_buffer = chunk->getIndexBuf();
1138  auto data_addr = data_buffer->getMemoryPtr();
1139  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1140  auto index_array = (StringOffsetT*)indices_addr;
1141  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1142  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1143  size_t nbytes_fix_data_to_keep = 0;
1144  size_t nbytes_var_data_to_keep = 0;
1145  auto nrows_to_vacuum = frag_offsets.size();
1146  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1147  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1148  auto is_last_one = irow == nrows_to_vacuum;
1149  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1150  auto maddr_to_vacuum = data_addr;
1151  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1152  if (nrows_to_keep > 0) {
1153  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1154  auto nbytes_to_keep =
1155  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1156  index_array[irow_of_blk_to_keep];
1157  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1158  // move curr varlen row block toward front
1159  memmove(data_addr + ibyte_var_data_to_keep,
1160  data_addr + index_array[irow_of_blk_to_keep],
1161  nbytes_to_keep);
1162 
1163  const auto index_base = index_array[irow_of_blk_to_keep];
1164  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1165  auto& index = index_array[irow_of_blk_to_keep + i];
1166  index = ibyte_var_data_to_keep + (index - index_base);
1167  }
1168  }
1169  nbytes_var_data_to_keep += nbytes_to_keep;
1170  maddr_to_vacuum = indices_addr;
1171 
1172  constexpr static auto index_element_size = sizeof(StringOffsetT);
1173  nbytes_to_keep = nrows_to_keep * index_element_size;
1174  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1175  // move curr fixlen row block toward front
1176  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1177  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1178  nbytes_to_keep);
1179  }
1180  irow_of_blk_to_fill += nrows_to_keep;
1181  nbytes_fix_data_to_keep += nbytes_to_keep;
1182  }
1183  irow_of_blk_to_keep = irow_to_vacuum + 1;
1184  }
1185  return nbytes_var_data_to_keep;
1186 }
1187 
1189  const TableDescriptor* td,
1190  const int fragment_id,
1191  const std::vector<uint64_t>& frag_offsets,
1192  const Data_Namespace::MemoryLevel memory_level,
1193  UpdelRoll& updel_roll) {
1194  auto fragment_ptr = getFragmentInfo(fragment_id);
1195  auto& fragment = *fragment_ptr;
1196  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1197  const auto ncol = chunks.size();
1198 
1199  std::vector<int8_t> has_null_per_thread(ncol, 0);
1200  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1201  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1202  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1203  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1204 
1205  // parallel delete columns
1206  std::vector<std::future<void>> threads;
1207  auto nrows_to_vacuum = frag_offsets.size();
1208  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1209  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1210 
1211  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1212  auto chunk = chunks[ci];
1213  const auto cd = chunk->getColumnDesc();
1214  const auto& col_type = cd->columnType;
1215  auto data_buffer = chunk->getBuffer();
1216  auto index_buffer = chunk->getIndexBuf();
1217  auto data_addr = data_buffer->getMemoryPtr();
1218  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1219  auto index_array = (StringOffsetT*)indices_addr;
1220  bool is_varlen = col_type.is_varlen_indeed();
1221 
1222  auto fixlen_vacuum = [=,
1223  &has_null_per_thread,
1224  &max_double_per_thread,
1225  &min_double_per_thread,
1226  &min_int64t_per_thread,
1227  &max_int64t_per_thread,
1228  &updel_roll,
1229  &frag_offsets,
1230  &fragment] {
1231  size_t nbytes_fix_data_to_keep;
1232  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1233 
1234  data_buffer->encoder->setNumElems(nrows_to_keep);
1235  data_buffer->setSize(nbytes_fix_data_to_keep);
1236  data_buffer->setUpdated();
1237 
1238  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1239 
1240  auto daddr = data_addr;
1241  auto element_size =
1242  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1243  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1244  if (col_type.is_fixlen_array()) {
1245  auto encoder =
1246  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1247  CHECK(encoder);
1248  encoder->updateMetadata((int8_t*)daddr);
1249  } else if (col_type.is_fp()) {
1250  set_chunk_stats(col_type,
1251  data_addr,
1252  has_null_per_thread[ci],
1253  min_double_per_thread[ci],
1254  max_double_per_thread[ci]);
1255  } else {
1256  set_chunk_stats(col_type,
1257  data_addr,
1258  has_null_per_thread[ci],
1259  min_int64t_per_thread[ci],
1260  max_int64t_per_thread[ci]);
1261  }
1262  }
1263  };
1264 
1265  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1266  size_t nbytes_var_data_to_keep;
1267  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1268 
1269  data_buffer->encoder->setNumElems(nrows_to_keep);
1270  data_buffer->setSize(nbytes_var_data_to_keep);
1271  data_buffer->setUpdated();
1272 
1273  index_array[nrows_to_keep] = data_buffer->size();
1274  index_buffer->setSize(sizeof(*index_array) *
1275  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1276  index_buffer->setUpdated();
1277 
1278  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1279  };
1280 
1281  if (is_varlen) {
1282  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1283  } else {
1284  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1285  }
1286  if (threads.size() >= (size_t)cpu_threads()) {
1287  wait_cleanup_threads(threads);
1288  }
1289  }
1290 
1291  wait_cleanup_threads(threads);
1292 
1293  auto key = std::make_pair(td, &fragment);
1294  updel_roll.numTuples[key] = nrows_to_keep;
1295  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1296  auto chunk = chunks[ci];
1297  auto cd = chunk->getColumnDesc();
1298  if (!cd->columnType.is_fixlen_array()) {
1300  fragment,
1301  chunk,
1302  has_null_per_thread[ci],
1303  max_double_per_thread[ci],
1304  min_double_per_thread[ci],
1305  max_int64t_per_thread[ci],
1306  min_int64t_per_thread[ci],
1307  cd->columnType,
1308  updel_roll);
1309  }
1310  }
1311 }
1312 
1313 } // namespace Fragmenter_Namespace
1314 
1316  if (nullptr == catalog) {
1317  return;
1318  }
1319  const auto td = catalog->getMetadataForTable(logicalTableId);
1320  CHECK(td);
1321  // checkpoint all shards regardless, or epoch becomes out of sync
1322  if (td->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
1323  catalog->checkpoint(logicalTableId);
1324  }
1325  // for each dirty fragment
1326  for (auto& cm : chunkMetadata) {
1327  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1328  }
1329  dirtyChunks.clear();
1330  // flush gpu dirty chunks if update was not on gpu
1331  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1332  for (const auto& chunkey : dirtyChunkeys) {
1333  catalog->getDataMgr().deleteChunksWithPrefix(
1335  }
1336  }
1337 }
1338 
1340  if (nullptr == catalog) {
1341  return;
1342  }
1343 
1344  if (is_varlen_update) {
1345  int databaseId = catalog->getCurrentDB().dbId;
1346  int32_t tableEpoch = catalog->getTableEpoch(databaseId, logicalTableId);
1347 
1348  dirtyChunks.clear();
1349  const_cast<Catalog_Namespace::Catalog*>(catalog)->setTableEpoch(
1350  databaseId, logicalTableId, tableEpoch);
1351  } else {
1352  const auto td = catalog->getMetadataForTable(logicalTableId);
1353  CHECK(td);
1354  if (td->persistenceLevel != memoryLevel) {
1355  for (auto dit : dirtyChunks) {
1356  catalog->getDataMgr().free(dit.first->getBuffer());
1357  dit.first->setBuffer(nullptr);
1358  }
1359  }
1360  }
1361 }
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:67
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
bool is_time() const
Definition: sqltypes.h:415
bool is_string() const
Definition: sqltypes.h:409
bool is_boolean() const
Definition: sqltypes.h:416
void commitUpdate()
const int8_t const int64_t * num_rows
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:148
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor *> columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll) override
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:149
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
const ColumnDescriptor * column_descriptor_
bool is_integer() const
Definition: sqltypes.h:411
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:40
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:183
void cancelUpdate()
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
AbstractBuffer * getBuffer() const
Definition: Chunk.h:104
void insertData(InsertData &insertDataStruct) override
appends data onto the most recently occuring fragment, creating a new one if necessary ...
virtual int8_t * getMemoryPtr()=0
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:267
#define UNREACHABLE()
Definition: Logger.h:241
const ChunkMetadataMap & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:100
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:813
HOST DEVICE int get_size() const
Definition: sqltypes.h:268
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:106
bool is_timeinterval() const
Definition: sqltypes.h:420
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:266
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
bool is_decimal() const
Definition: sqltypes.h:412
HOST DEVICE int get_scale() const
Definition: sqltypes.h:263
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
int32_t StringOffsetT
Definition: sqltypes.h:856
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:129
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:265
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3512
int64_t get_epoch_seconds_from_days(const int64_t days)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters system_parameters=SystemParameters())
Definition: Execute.cpp:141
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual size_t const getEntryCount() const =0
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:182
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1446
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE > > ColumnDataPtr
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
bool is_null(const T &v, const SQLTypeInfo &t)
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
int32_t ArrayOffsetT
Definition: sqltypes.h:857
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:260
bool g_enable_experimental_string_functions
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
const ColumnDescriptor * getColumnDesc() const
Definition: Chunk.h:53
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t > > ColumnDataPtr
std::map< MetaDataKey, ChunkMetadataMap > chunkMetadata
Definition: UpdelRoll.h:59
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:35
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
SQLTypeInfo columnType
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
specifies the content in-memory of a row in the table metadata table
int8_t * numbersPtr
Definition: sqltypes.h:147
void set_minmax(T &min, T &max, T const val)
int cpu_threads()
Definition: thread_count.h:25
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
bool is_fp() const
Definition: sqltypes.h:413
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)