OmniSciDB  eee9fa949c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <boost/variant.hpp>
18 #include <boost/variant/get.hpp>
19 #include <limits>
20 #include <mutex>
21 #include <string>
22 #include <vector>
23 
24 #include "Catalog/Catalog.h"
25 #include "DataMgr/DataMgr.h"
28 #include "QueryEngine/Execute.h"
30 #include "Shared/ConfigResolve.h"
31 #include "Shared/DateConverters.h"
32 #include "Shared/Logger.h"
34 #include "Shared/thread_count.h"
36 
38 
39 namespace Fragmenter_Namespace {
40 
41 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
42  for (auto& t : threads) {
43  t.get();
44  }
45  threads.clear();
46 }
47 
49  auto fragment_it = std::find_if(
50  fragmentInfoVec_.begin(), fragmentInfoVec_.end(), [=](const auto& f) -> bool {
51  return f.fragmentId == fragment_id;
52  });
53  CHECK(fragment_it != fragmentInfoVec_.end());
54  return *fragment_it;
55 }
56 
57 inline bool is_integral(const SQLTypeInfo& t) {
58  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
59 }
60 
62 
64  const std::string& tab_name,
65  const std::string& col_name,
66  const int fragment_id,
67  const std::vector<uint64_t>& frag_offsets,
68  const std::vector<ScalarTargetValue>& rhs_values,
69  const SQLTypeInfo& rhs_type,
70  const Data_Namespace::MemoryLevel memory_level,
71  UpdelRoll& updel_roll) {
72  const auto td = catalog->getMetadataForTable(tab_name);
73  CHECK(td);
74  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
75  CHECK(cd);
76  td->fragmenter->updateColumn(catalog,
77  td,
78  cd,
79  fragment_id,
80  frag_offsets,
81  rhs_values,
82  rhs_type,
83  memory_level,
84  updel_roll);
85 }
86 
88  const TableDescriptor* td,
89  const ColumnDescriptor* cd,
90  const int fragment_id,
91  const std::vector<uint64_t>& frag_offsets,
92  const ScalarTargetValue& rhs_value,
93  const SQLTypeInfo& rhs_type,
94  const Data_Namespace::MemoryLevel memory_level,
95  UpdelRoll& updel_roll) {
96  updateColumn(catalog,
97  td,
98  cd,
99  fragment_id,
100  frag_offsets,
101  std::vector<ScalarTargetValue>(1, rhs_value),
102  rhs_type,
103  memory_level,
104  updel_roll);
105 }
106 
107 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
108  const TableDescriptor* td,
109  const FragmentInfo& fragment,
110  const Data_Namespace::MemoryLevel memory_level,
111  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
112  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
113  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
114  ++nc;
115  if (!cd->isVirtualCol) {
116  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
117  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
118  ChunkKey chunk_key{
119  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
120  auto chunk = Chunk_NS::Chunk::getChunk(cd,
121  &catalog->getDataMgr(),
122  chunk_key,
123  memory_level,
124  0,
125  chunk_meta_it->second.numBytes,
126  chunk_meta_it->second.numElements);
127  chunks.push_back(chunk);
128  }
129  }
130  }
131  return chunks.size();
132 }
133 
135  public:
137 
138  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
139 
140  virtual void addDataBlocksToInsertData(
141  Fragmenter_Namespace::InsertData& insertData) = 0;
142 };
143 
144 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
146  using ColumnDataPtr =
147  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
148 
152  const BUFFER_DATA_TYPE* data_buffer_addr_;
153 
154  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
155  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
156  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
157  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
158  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->get_buffer()->getMemoryPtr();
159  }
160 
161  ~ScalarChunkConverter() override {}
162 
163  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
164  auto buffer_value = data_buffer_addr_[indexInFragment];
165  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
166  column_data_.get()[row] = insert_value;
167  }
168 
170  DataBlockPtr dataBlock;
171  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
172  insertData.data.push_back(dataBlock);
173  insertData.columnIds.push_back(column_descriptor_->columnId);
174  }
175 };
176 
180 
181  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
184 
186  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
187  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
190  }
191 
193 
194  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
195  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
196 
198  src_value_ptr);
199 
200  (*column_data_)[row] = ArrayDatum(
201  fixed_array_length_, (int8_t*)src_value_ptr, is_null, DoNothingDeleter());
202  }
203 
205  DataBlockPtr dataBlock;
206  dataBlock.arraysPtr = column_data_.get();
207  insertData.data.push_back(dataBlock);
208  insertData.columnIds.push_back(column_descriptor_->columnId);
209  }
210 };
211 
214 
215  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
216  : FixedLenArrayChunkConverter(num_rows, chunk) {
218  (StringOffsetT*)(chunk->get_index_buf() ? chunk->get_index_buf()->getMemoryPtr()
219  : nullptr);
220  }
221 
222  ~ArrayChunkConverter() override {}
223 
224  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
225  auto startIndex = index_buffer_addr_[indexInFragment];
226  auto endIndex = index_buffer_addr_[indexInFragment + 1];
227  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
228  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
229  (*column_data_)[row] = ArrayDatum(
230  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
231  }
232 };
233 
237 
238  std::unique_ptr<std::vector<std::string>> column_data_;
239  const int8_t* data_buffer_addr_;
241 
243  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
244  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
247  (StringOffsetT*)(chunk->get_index_buf() ? chunk->get_index_buf()->getMemoryPtr()
248  : nullptr);
249  }
250 
251  ~StringChunkConverter() override {}
252 
253  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
254  size_t src_value_size =
255  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
256  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
257  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
258  }
259 
261  DataBlockPtr dataBlock;
262  dataBlock.stringsPtr = column_data_.get();
263  insertData.data.push_back(dataBlock);
264  insertData.columnIds.push_back(column_descriptor_->columnId);
265  }
266 };
267 
268 template <typename BUFFER_DATA_TYPE>
270  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
271 
275  const BUFFER_DATA_TYPE* data_buffer_addr_;
276 
277  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
278  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
280  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
281  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->get_buffer()->getMemoryPtr();
282  }
283 
284  ~DateChunkConverter() override {}
285 
286  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
287  auto buffer_value = data_buffer_addr_[indexInFragment];
288  auto insert_value = static_cast<int64_t>(buffer_value);
290  }
291 
293  DataBlockPtr dataBlock;
294  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
295  insertData.data.push_back(dataBlock);
296  insertData.columnIds.push_back(column_descriptor_->columnId);
297  }
298 };
299 
301  const Catalog_Namespace::Catalog* catalog,
302  const TableDescriptor* td,
303  const int fragmentId,
304  const std::vector<TargetMetaInfo> sourceMetaInfo,
305  const std::vector<const ColumnDescriptor*> columnDescriptors,
306  const RowDataProvider& sourceDataProvider,
307  const size_t indexOffFragmentOffsetColumn,
308  const Data_Namespace::MemoryLevel memoryLevel,
309  UpdelRoll& updelRoll) {
310  if (!is_feature_enabled<VarlenUpdates>()) {
311  throw std::runtime_error("varlen UPDATE path not enabled.");
312  }
313 
314  updelRoll.is_varlen_update = true;
315  updelRoll.catalog = catalog;
316  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
317  updelRoll.memoryLevel = memoryLevel;
318 
319  size_t num_entries = sourceDataProvider.getEntryCount();
320  size_t num_rows = sourceDataProvider.getRowCount();
321 
322  if (0 == num_rows) {
323  // bail out early
324  return;
325  }
326 
328 
329  auto& fragment = getFragmentInfoFromId(fragmentId);
330  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
331  get_chunks(catalog, td, fragment, memoryLevel, chunks);
332  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
333  columnDescriptors.size());
334  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
335  std::shared_ptr<Executor> executor;
336 
338  executor = Executor::getExecutor(catalog->getCurrentDB().dbId);
339  }
340 
341  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
342  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
343  auto chunk = chunks[indexOfChunk];
344  const auto chunk_cd = chunk->get_column_desc();
345 
346  if (chunk_cd->isDeletedCol) {
347  deletedChunk = chunk;
348  continue;
349  }
350 
351  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
352  columnDescriptors.end(),
353  [=](const ColumnDescriptor* cd) -> bool {
354  return cd->columnId == chunk_cd->columnId;
355  });
356 
357  if (targetColumnIt != columnDescriptors.end()) {
358  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
359 
360  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
361  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
362 
364  num_rows,
365  *catalog,
366  sourceDataMetaInfo,
367  targetDescriptor,
368  targetDescriptor->columnType,
369  !targetDescriptor->columnType.get_notnull(),
370  sourceDataProvider.getLiteralDictionary(),
372  ? executor->getStringDictionaryProxy(
373  sourceDataMetaInfo.get_type_info().get_comp_param(),
374  executor->getRowSetMemoryOwner(),
375  true)
376  : nullptr};
377  auto converter = factory.create(param);
378  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
379 
380  if (targetDescriptor->columnType.is_geometry()) {
381  // geometry columns are composites
382  // need to skip chunks, depending on geo type
383  switch (targetDescriptor->columnType.get_type()) {
384  case kMULTIPOLYGON:
385  indexOfChunk += 5;
386  break;
387  case kPOLYGON:
388  indexOfChunk += 4;
389  break;
390  case kLINESTRING:
391  indexOfChunk += 2;
392  break;
393  case kPOINT:
394  indexOfChunk += 1;
395  break;
396  default:
397  CHECK(false); // not supported
398  }
399  }
400  } else {
401  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
402  std::unique_ptr<ChunkToInsertDataConverter> converter;
403 
404  if (chunk_cd->columnType.is_fixlen_array()) {
405  converter =
406  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
407  } else if (chunk_cd->columnType.is_string()) {
408  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
409  } else if (chunk_cd->columnType.is_geometry()) {
410  // the logical geo column is a string column
411  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
412  } else {
413  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
414  }
415 
416  chunkConverters.push_back(std::move(converter));
417 
418  } else if (chunk_cd->columnType.is_date_in_days()) {
419  /* Q: Why do we need this?
420  A: In variable length updates path we move the chunk content of column
421  without decoding. Since it again passes through DateDaysEncoder
422  the expected value should be in seconds, but here it will be in days.
423  Therefore, using DateChunkConverter chunk values are being scaled to
424  seconds which then ultimately encoded in days in DateDaysEncoder.
425  */
426  std::unique_ptr<ChunkToInsertDataConverter> converter;
427  const size_t physical_size = chunk_cd->columnType.get_size();
428  if (physical_size == 2) {
429  converter =
430  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
431  } else if (physical_size == 4) {
432  converter =
433  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
434  } else {
435  CHECK(false);
436  }
437  chunkConverters.push_back(std::move(converter));
438  } else {
439  std::unique_ptr<ChunkToInsertDataConverter> converter;
440  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
441  int logical_size = logical_type.get_size();
442  int physical_size = chunk_cd->columnType.get_size();
443 
444  if (logical_type.is_string()) {
445  // for dicts -> logical = physical
446  logical_size = physical_size;
447  }
448 
449  if (8 == physical_size) {
450  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
451  num_rows, chunk.get());
452  } else if (4 == physical_size) {
453  if (8 == logical_size) {
454  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
455  num_rows, chunk.get());
456  } else {
457  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
458  num_rows, chunk.get());
459  }
460  } else if (2 == chunk_cd->columnType.get_size()) {
461  if (8 == logical_size) {
462  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
463  num_rows, chunk.get());
464  } else if (4 == logical_size) {
465  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
466  num_rows, chunk.get());
467  } else {
468  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
469  num_rows, chunk.get());
470  }
471  } else if (1 == chunk_cd->columnType.get_size()) {
472  if (8 == logical_size) {
473  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
474  num_rows, chunk.get());
475  } else if (4 == logical_size) {
476  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
477  num_rows, chunk.get());
478  } else if (2 == logical_size) {
479  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
480  num_rows, chunk.get());
481  } else {
482  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
483  num_rows, chunk.get());
484  }
485  } else {
486  CHECK(false); // unknown
487  }
488 
489  chunkConverters.push_back(std::move(converter));
490  }
491  }
492  }
493 
494  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
495  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
496 
497  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
498  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
499  deletedChunk->get_column_desc()->tableId,
500  deletedChunk->get_column_desc()->columnId,
501  fragment.fragmentId};
502  updelRoll.dirtyChunkeys.insert(chunkey);
503  bool* deletedChunkBuffer =
504  reinterpret_cast<bool*>(deletedChunk->get_buffer()->getMemoryPtr());
505 
506  std::atomic<size_t> row_idx{0};
507 
508  auto row_converter = [&sourceDataProvider,
509  &sourceDataConverters,
510  &indexOffFragmentOffsetColumn,
511  &chunkConverters,
512  &deletedChunkBuffer,
513  &row_idx](size_t indexOfEntry) -> void {
514  // convert the source data
515  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
516  if (row.empty()) {
517  return;
518  }
519 
520  size_t indexOfRow = row_idx.fetch_add(1);
521 
522  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
523  if (sourceDataConverters[col]) {
524  const auto& mapd_variant = row[col];
525  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
526  }
527  }
528 
529  auto scalar = checked_get(
530  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
531  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
532 
533  // convert the remaining chunks
534  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
535  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
536  }
537 
538  // now mark the row as deleted
539  deletedChunkBuffer[indexInChunkBuffer] = true;
540  };
541 
542  bool can_go_parallel = num_rows > 20000;
543 
544  if (can_go_parallel) {
545  const size_t num_worker_threads = cpu_threads();
546  std::vector<std::future<void>> worker_threads;
547  for (size_t i = 0,
548  start_entry = 0,
549  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
550  i < num_worker_threads && start_entry < num_entries;
551  ++i, start_entry += stride) {
552  const auto end_entry = std::min(start_entry + stride, num_rows);
553  worker_threads.push_back(std::async(
554  std::launch::async,
555  [&row_converter](const size_t start, const size_t end) {
556  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
557  row_converter(indexOfRow);
558  }
559  },
560  start_entry,
561  end_entry));
562  }
563 
564  for (auto& child : worker_threads) {
565  child.wait();
566  }
567 
568  } else {
569  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
570  row_converter(entryIdx);
571  }
572  }
573 
575  insert_data.databaseId = catalog->getCurrentDB().dbId;
576  insert_data.tableId = td->tableId;
577 
578  for (size_t i = 0; i < chunkConverters.size(); i++) {
579  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
580  continue;
581  }
582 
583  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
584  if (sourceDataConverters[i]) {
585  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
586  }
587  continue;
588  }
589 
590  insert_data.numRows = num_rows;
591  insertDataNoCheckpoint(insert_data);
592 
593  // update metdata
594  if (!deletedChunk->get_buffer()->has_encoder) {
595  deletedChunk->init_encoder();
596  }
597  deletedChunk->get_buffer()->encoder->updateStats(static_cast<int64_t>(true), false);
598 
599  auto& shadowDeletedChunkMeta =
600  fragment.shadowChunkMetadataMap[deletedChunk->get_column_desc()->columnId];
601  if (shadowDeletedChunkMeta.numElements >
602  deletedChunk->get_buffer()->encoder->getNumElems()) {
603  // the append will have populated shadow meta data, otherwise use existing num
604  // elements
605  deletedChunk->get_buffer()->encoder->setNumElems(shadowDeletedChunkMeta.numElements);
606  }
607  deletedChunk->get_buffer()->setUpdated();
608 }
609 
611  const TableDescriptor* td,
612  const ColumnDescriptor* cd,
613  const int fragment_id,
614  const std::vector<uint64_t>& frag_offsets,
615  const std::vector<ScalarTargetValue>& rhs_values,
616  const SQLTypeInfo& rhs_type,
617  const Data_Namespace::MemoryLevel memory_level,
618  UpdelRoll& updel_roll) {
619  updel_roll.catalog = catalog;
620  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
621  updel_roll.memoryLevel = memory_level;
622 
623  const size_t ncore = cpu_threads();
624  const auto nrow = frag_offsets.size();
625  const auto n_rhs_values = rhs_values.size();
626  if (0 == nrow) {
627  return;
628  }
629  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
630 
631  auto& fragment = getFragmentInfoFromId(fragment_id);
632  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
633  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
634  ChunkKey chunk_key{
635  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
636  auto chunk = Chunk_NS::Chunk::getChunk(cd,
637  &catalog->getDataMgr(),
638  chunk_key,
640  0,
641  chunk_meta_it->second.numBytes,
642  chunk_meta_it->second.numElements);
643 
644  std::vector<int8_t> has_null_per_thread(ncore, 0);
645  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
646  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
647  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
648  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
649 
650  // parallel update elements
651  std::vector<std::future<void>> threads;
652 
653  const auto segsz = (nrow + ncore - 1) / ncore;
654  auto dbuf = chunk->get_buffer();
655  auto dbuf_addr = dbuf->getMemoryPtr();
656  dbuf->setUpdated();
657  {
658  std::lock_guard<std::mutex> lck(updel_roll.mutex);
659  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
660  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
661  }
662 
663  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
664  cd->tableId,
665  cd->columnId,
666  fragment.fragmentId};
667  updel_roll.dirtyChunkeys.insert(chunkey);
668  }
669  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
670  threads.emplace_back(std::async(
671  std::launch::async,
672  [=,
673  &has_null_per_thread,
674  &min_int64t_per_thread,
675  &max_int64t_per_thread,
676  &min_double_per_thread,
677  &max_double_per_thread,
678  &frag_offsets,
679  &rhs_values] {
680  SQLTypeInfo lhs_type = cd->columnType;
681 
682  // !! not sure if this is a undocumented convention or a bug, but for a sharded
683  // table the dictionary id of a encoded string column is not specified by
684  // comp_param in physical table but somehow in logical table :) comp_param in
685  // physical table is always 0, so need to adapt accordingly...
686  auto cdl = (shard_ < 0)
687  ? cd
688  : catalog->getMetadataForColumn(
689  catalog->getLogicalTableId(td->tableId), cd->columnId);
690  CHECK(cdl);
691  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
692  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
693  lhs_type, &decimalOverflowValidator);
694  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
695  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
696  lhs_type, &dateDaysOverflowValidator);
697 
698  StringDictionary* stringDict{nullptr};
699  if (lhs_type.is_string()) {
700  CHECK(kENCODING_DICT == lhs_type.get_compression());
701  auto dictDesc = const_cast<DictDescriptor*>(
702  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
703  CHECK(dictDesc);
704  stringDict = dictDesc->stringDict.get();
705  CHECK(stringDict);
706  }
707 
708  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
709  const auto roffs = frag_offsets[r];
710  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
711  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
712  ScalarTargetValue sv2;
713 
714  // Subtle here is on the two cases of string-to-string assignments, when
715  // upstream passes RHS string as a string index instead of a preferred "real
716  // string".
717  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
718  // index
719  // in this layer, so if upstream passes a str idx here, an
720  // exception is thrown.
721  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
722  // str idx.
723  if (rhs_type.is_string()) {
724  if (const auto vp = boost::get<int64_t>(sv)) {
725  auto dictDesc = const_cast<DictDescriptor*>(
726  catalog->getMetadataForDict(rhs_type.get_comp_param()));
727  if (nullptr == dictDesc) {
728  throw std::runtime_error(
729  "UPDATE does not support cast from string literal to string "
730  "column.");
731  }
732  auto stringDict = dictDesc->stringDict.get();
733  CHECK(stringDict);
734  sv2 = NullableString(stringDict->getString(*vp));
735  sv = &sv2;
736  }
737  }
738 
739  if (const auto vp = boost::get<int64_t>(sv)) {
740  auto v = *vp;
741  if (lhs_type.is_string()) {
742  throw std::runtime_error("UPDATE does not support cast to string.");
743  }
744  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
745  if (lhs_type.is_decimal()) {
746  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
747  int64_t decimal_val;
748  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
749  tabulate_metadata(lhs_type,
750  min_int64t_per_thread[c],
751  max_int64t_per_thread[c],
752  has_null_per_thread[c],
753  (v == inline_int_null_value<int64_t>() &&
754  lhs_type.get_notnull() == false)
755  ? v
756  : decimal_val);
757  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
758  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
759  if (positive_v_and_negative_d || negative_v_and_positive_d) {
760  throw std::runtime_error(
761  "Data conversion overflow on " + std::to_string(v) +
762  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
763  std::to_string(rhs_type.get_scale()) + ") to (" +
764  std::to_string(lhs_type.get_dimension()) + ", " +
765  std::to_string(lhs_type.get_scale()) + ")");
766  }
767  } else if (is_integral(lhs_type)) {
768  if (lhs_type.is_date_in_days()) {
769  // Store meta values in seconds
770  if (lhs_type.get_size() == 2) {
771  nullAwareDateOverflowValidator.validate<int16_t>(v);
772  } else {
773  nullAwareDateOverflowValidator.validate<int32_t>(v);
774  }
775  int64_t days;
776  get_scalar<int64_t>(data_ptr, lhs_type, days);
777  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
778  tabulate_metadata(lhs_type,
779  min_int64t_per_thread[c],
780  max_int64t_per_thread[c],
781  has_null_per_thread[c],
782  (v == inline_int_null_value<int64_t>() &&
783  lhs_type.get_notnull() == false)
784  ? NullSentinelSupplier()(lhs_type, v)
785  : seconds);
786  } else {
787  int64_t target_value;
788  if (rhs_type.is_decimal()) {
789  target_value = round(decimal_to_double(rhs_type, v));
790  } else {
791  target_value = v;
792  }
793  tabulate_metadata(lhs_type,
794  min_int64t_per_thread[c],
795  max_int64t_per_thread[c],
796  has_null_per_thread[c],
797  target_value);
798  }
799  } else {
801  lhs_type,
802  min_double_per_thread[c],
803  max_double_per_thread[c],
804  has_null_per_thread[c],
805  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
806  }
807  } else if (const auto vp = boost::get<double>(sv)) {
808  auto v = *vp;
809  if (lhs_type.is_string()) {
810  throw std::runtime_error("UPDATE does not support cast to string.");
811  }
812  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
813  if (lhs_type.is_integer()) {
814  tabulate_metadata(lhs_type,
815  min_int64t_per_thread[c],
816  max_int64t_per_thread[c],
817  has_null_per_thread[c],
818  int64_t(v));
819  } else if (lhs_type.is_fp()) {
820  tabulate_metadata(lhs_type,
821  min_double_per_thread[c],
822  max_double_per_thread[c],
823  has_null_per_thread[c],
824  double(v));
825  } else {
826  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
827  "LHS with a floating RHS.";
828  }
829  } else if (const auto vp = boost::get<float>(sv)) {
830  auto v = *vp;
831  if (lhs_type.is_string()) {
832  throw std::runtime_error("UPDATE does not support cast to string.");
833  }
834  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
835  if (lhs_type.is_integer()) {
836  tabulate_metadata(lhs_type,
837  min_int64t_per_thread[c],
838  max_int64t_per_thread[c],
839  has_null_per_thread[c],
840  int64_t(v));
841  } else {
842  tabulate_metadata(lhs_type,
843  min_double_per_thread[c],
844  max_double_per_thread[c],
845  has_null_per_thread[c],
846  double(v));
847  }
848  } else if (const auto vp = boost::get<NullableString>(sv)) {
849  const auto s = boost::get<std::string>(vp);
850  const auto sval = s ? *s : std::string("");
851  if (lhs_type.is_string()) {
852  decltype(stringDict->getOrAdd(sval)) sidx;
853  {
854  std::unique_lock<std::mutex> lock(temp_mutex_);
855  sidx = stringDict->getOrAdd(sval);
856  }
857  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
858  tabulate_metadata(lhs_type,
859  min_int64t_per_thread[c],
860  max_int64t_per_thread[c],
861  has_null_per_thread[c],
862  int64_t(sidx));
863  } else if (sval.size() > 0) {
864  auto dval = std::atof(sval.data());
865  if (lhs_type.is_boolean()) {
866  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
867  } else if (lhs_type.is_time()) {
868  throw std::runtime_error(
869  "Date/Time/Timestamp update not supported through translated "
870  "string path.");
871  }
872  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
873  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
874  tabulate_metadata(lhs_type,
875  min_double_per_thread[c],
876  max_double_per_thread[c],
877  has_null_per_thread[c],
878  double(dval));
879  } else {
880  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
881  tabulate_metadata(lhs_type,
882  min_int64t_per_thread[c],
883  max_int64t_per_thread[c],
884  has_null_per_thread[c],
885  int64_t(dval));
886  }
887  } else {
888  put_null(data_ptr, lhs_type, cd->columnName);
889  has_null_per_thread[c] = true;
890  }
891  } else {
892  CHECK(false);
893  }
894  }
895  }));
896  if (threads.size() >= (size_t)cpu_threads()) {
897  wait_cleanup_threads(threads);
898  }
899  }
900  wait_cleanup_threads(threads);
901 
902  // for unit test
904  if (cd->isDeletedCol) {
905  const auto deleted_offsets = getVacuumOffsets(chunk);
906  if (deleted_offsets.size() > 0) {
907  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
908  return;
909  }
910  }
911  }
912  bool has_null_per_chunk{false};
913  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
914  double min_double_per_chunk{std::numeric_limits<double>::max()};
915  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
916  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
917  for (size_t c = 0; c < ncore; ++c) {
918  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
919  max_double_per_chunk =
920  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
921  min_double_per_chunk =
922  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
923  max_int64t_per_chunk =
924  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
925  min_int64t_per_chunk =
926  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
927  }
929  fragment,
930  chunk,
931  has_null_per_chunk,
932  max_double_per_chunk,
933  min_double_per_chunk,
934  max_int64t_per_chunk,
935  min_int64t_per_chunk,
936  cd->columnType,
937  updel_roll);
938 }
939 
941  FragmentInfo& fragment,
942  std::shared_ptr<Chunk_NS::Chunk> chunk,
943  const bool has_null_per_chunk,
944  const double max_double_per_chunk,
945  const double min_double_per_chunk,
946  const int64_t max_int64t_per_chunk,
947  const int64_t min_int64t_per_chunk,
948  const SQLTypeInfo& rhs_type,
949  UpdelRoll& updel_roll) {
950  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
951  auto key = std::make_pair(td, &fragment);
952  std::lock_guard<std::mutex> lck(updel_roll.mutex);
953  if (0 == updel_roll.chunkMetadata.count(key)) {
954  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
955  }
956  if (0 == updel_roll.numTuples.count(key)) {
957  updel_roll.numTuples[key] = fragment.shadowNumTuples;
958  }
959  auto& chunkMetadata = updel_roll.chunkMetadata[key];
960 
961  auto buffer = chunk->get_buffer();
962  const auto& lhs_type = cd->columnType;
963 
964  auto update_stats = [& encoder = buffer->encoder](auto min, auto max, auto has_null) {
965  static_assert(std::is_same<decltype(min), decltype(max)>::value,
966  "Type mismatch on min/max");
967  if (has_null) {
968  encoder->updateStats(decltype(min)(), true);
969  }
970  if (max < min) {
971  return;
972  }
973  encoder->updateStats(min, false);
974  encoder->updateStats(max, false);
975  };
976 
977  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
978  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
979  } else if (lhs_type.is_fp()) {
980  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
981  } else if (lhs_type.is_decimal()) {
982  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
983  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
984  has_null_per_chunk);
985  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
986  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
987  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
988  }
989  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
990 
991  // removed as @alex suggests. keep it commented in case of any chance to revisit
992  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
993 }
994 
996  const MetaDataKey& key,
997  UpdelRoll& updel_roll) {
998  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
999  if (updel_roll.chunkMetadata.count(key)) {
1000  auto& fragmentInfo = *key.second;
1001  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
1002  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
1003  fragmentInfo.setChunkMetadataMap(chunkMetadata);
1004  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
1005  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
1006  // TODO(ppan): When fragment-level compaction is enable, the following code
1007  // should suffice. When not (ie. existing code), we'll revert to update
1008  // InsertOrderFragmenter::varLenColInfo_
1009  /*
1010  for (const auto cit : chunkMetadata) {
1011  const auto& cd = *catalog->getMetadataForColumn(td->tableId, cit.first);
1012  if (cd.columnType.get_size() < 0)
1013  fragmentInfo.varLenColInfox[cd.columnId] = cit.second.numBytes;
1014  }
1015  */
1016  }
1017 }
1018 
1020  const TableDescriptor* td,
1021  const FragmentInfo& fragment,
1022  const Data_Namespace::MemoryLevel memory_level) {
1023  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
1024  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1025  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1026  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1027  ++ncol;
1028  if (!cd->isVirtualCol) {
1029  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1030  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1031  ChunkKey chunk_key{
1032  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1033  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1034  &catalog_->getDataMgr(),
1035  chunk_key,
1036  memory_level,
1037  0,
1038  chunk_meta_it->second.numBytes,
1039  chunk_meta_it->second.numElements);
1040  chunks.push_back(chunk);
1041  }
1042  }
1043  }
1044  return chunks;
1045 }
1046 
1047 // get a sorted vector of offsets of rows to vacuum
1048 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1049  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1050  const auto data_buffer = chunk->get_buffer();
1051  const auto data_addr = data_buffer->getMemoryPtr();
1052  const size_t nrows_in_chunk = data_buffer->size();
1053  const size_t ncore = cpu_threads();
1054  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1055  std::vector<std::vector<uint64_t>> deleted_offsets;
1056  deleted_offsets.resize(ncore);
1057  std::vector<std::future<void>> threads;
1058  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1059  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1060  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1061  const auto ithread = rbegin / segsz;
1062  CHECK(ithread < deleted_offsets.size());
1063  deleted_offsets[ithread].reserve(segsz);
1064  for (size_t r = rbegin; r < rend; ++r) {
1065  if (data_addr[r]) {
1066  deleted_offsets[ithread].push_back(r);
1067  }
1068  }
1069  }));
1070  }
1071  wait_cleanup_threads(threads);
1072  std::vector<uint64_t> all_deleted_offsets;
1073  for (size_t i = 0; i < ncore; ++i) {
1074  all_deleted_offsets.insert(
1075  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1076  }
1077  return all_deleted_offsets;
1078 }
1079 
1080 template <typename T>
1081 static void set_chunk_stats(const SQLTypeInfo& col_type,
1082  int8_t* data_addr,
1083  int8_t& has_null,
1084  T& min,
1085  T& max) {
1086  T v;
1087  const auto can_be_null = !col_type.get_notnull();
1088  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1089  if (is_null) {
1090  has_null = has_null || (can_be_null && is_null);
1091  } else {
1092  set_minmax(min, max, v);
1093  }
1094 }
1095 
1097  FragmentInfo& fragment,
1098  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1099  const size_t nrows_to_keep,
1100  UpdelRoll& updel_roll) {
1101  auto cd = chunk->get_column_desc();
1102  auto td = catalog->getMetadataForTable(cd->tableId);
1103  auto data_buffer = chunk->get_buffer();
1104  std::lock_guard<std::mutex> lck(updel_roll.mutex);
1105  const auto key = std::make_pair(td, &fragment);
1106  if (0 == updel_roll.chunkMetadata.count(key)) {
1107  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
1108  }
1109  auto& chunkMetadata = updel_roll.chunkMetadata[key];
1110  chunkMetadata[cd->columnId].numElements = nrows_to_keep;
1111  chunkMetadata[cd->columnId].numBytes = data_buffer->size();
1112  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
1113  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
1114  }
1115 }
1116 
1118  const FragmentInfo& fragment,
1119  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1120  const std::vector<uint64_t>& frag_offsets) {
1121  const auto cd = chunk->get_column_desc();
1122  const auto& col_type = cd->columnType;
1123  auto data_buffer = chunk->get_buffer();
1124  auto data_addr = data_buffer->getMemoryPtr();
1125  auto element_size =
1126  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1127  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1128  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1129  size_t nbytes_fix_data_to_keep = 0;
1130  auto nrows_to_vacuum = frag_offsets.size();
1131  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1132  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1133  auto is_last_one = irow == nrows_to_vacuum;
1134  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1135  auto maddr_to_vacuum = data_addr;
1136  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1137  if (nrows_to_keep > 0) {
1138  auto nbytes_to_keep = nrows_to_keep * element_size;
1139  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1140  // move curr fixlen row block toward front
1141  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1142  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1143  nbytes_to_keep);
1144  }
1145  irow_of_blk_to_fill += nrows_to_keep;
1146  nbytes_fix_data_to_keep += nbytes_to_keep;
1147  }
1148  irow_of_blk_to_keep = irow_to_vacuum + 1;
1149  }
1150  return nbytes_fix_data_to_keep;
1151 }
1152 
1154  const FragmentInfo& fragment,
1155  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1156  const std::vector<uint64_t>& frag_offsets) {
1157  auto data_buffer = chunk->get_buffer();
1158  auto index_buffer = chunk->get_index_buf();
1159  auto data_addr = data_buffer->getMemoryPtr();
1160  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1161  auto index_array = (StringOffsetT*)indices_addr;
1162  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1163  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1164  size_t nbytes_fix_data_to_keep = 0;
1165  size_t nbytes_var_data_to_keep = 0;
1166  auto nrows_to_vacuum = frag_offsets.size();
1167  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1168  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1169  auto is_last_one = irow == nrows_to_vacuum;
1170  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1171  auto maddr_to_vacuum = data_addr;
1172  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1173  if (nrows_to_keep > 0) {
1174  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1175  auto nbytes_to_keep =
1176  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1177  index_array[irow_of_blk_to_keep];
1178  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1179  // move curr varlen row block toward front
1180  memmove(data_addr + ibyte_var_data_to_keep,
1181  data_addr + index_array[irow_of_blk_to_keep],
1182  nbytes_to_keep);
1183 
1184  const auto index_base = index_array[irow_of_blk_to_keep];
1185  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1186  auto& index = index_array[irow_of_blk_to_keep + i];
1187  index = ibyte_var_data_to_keep + (index - index_base);
1188  }
1189  }
1190  nbytes_var_data_to_keep += nbytes_to_keep;
1191  maddr_to_vacuum = indices_addr;
1192 
1193  constexpr static auto index_element_size = sizeof(StringOffsetT);
1194  nbytes_to_keep = nrows_to_keep * index_element_size;
1195  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1196  // move curr fixlen row block toward front
1197  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1198  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1199  nbytes_to_keep);
1200  }
1201  irow_of_blk_to_fill += nrows_to_keep;
1202  nbytes_fix_data_to_keep += nbytes_to_keep;
1203  }
1204  irow_of_blk_to_keep = irow_to_vacuum + 1;
1205  }
1206  return nbytes_var_data_to_keep;
1207 }
1208 
1210  const TableDescriptor* td,
1211  const int fragment_id,
1212  const std::vector<uint64_t>& frag_offsets,
1213  const Data_Namespace::MemoryLevel memory_level,
1214  UpdelRoll& updel_roll) {
1215  auto& fragment = getFragmentInfoFromId(fragment_id);
1216  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1217  const auto ncol = chunks.size();
1218 
1219  std::vector<int8_t> has_null_per_thread(ncol, 0);
1220  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1221  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1222  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1223  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1224 
1225  // parallel delete columns
1226  std::vector<std::future<void>> threads;
1227  auto nrows_to_vacuum = frag_offsets.size();
1228  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1229  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1230 
1231  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1232  auto chunk = chunks[ci];
1233  const auto cd = chunk->get_column_desc();
1234  const auto& col_type = cd->columnType;
1235  auto data_buffer = chunk->get_buffer();
1236  auto index_buffer = chunk->get_index_buf();
1237  auto data_addr = data_buffer->getMemoryPtr();
1238  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1239  auto index_array = (StringOffsetT*)indices_addr;
1240  bool is_varlen = col_type.is_varlen_indeed();
1241 
1242  auto fixlen_vacuum = [=,
1243  &has_null_per_thread,
1244  &max_double_per_thread,
1245  &min_double_per_thread,
1246  &min_int64t_per_thread,
1247  &max_int64t_per_thread,
1248  &updel_roll,
1249  &frag_offsets,
1250  &fragment] {
1251  size_t nbytes_fix_data_to_keep;
1252  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1253 
1254  data_buffer->encoder->setNumElems(nrows_to_keep);
1255  data_buffer->setSize(nbytes_fix_data_to_keep);
1256  data_buffer->setUpdated();
1257 
1258  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1259 
1260  auto daddr = data_addr;
1261  auto element_size =
1262  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1263  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1264  if (col_type.is_fixlen_array()) {
1265  auto encoder =
1266  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1267  CHECK(encoder);
1268  encoder->updateMetadata((int8_t*)daddr);
1269  } else if (col_type.is_fp()) {
1270  set_chunk_stats(col_type,
1271  data_addr,
1272  has_null_per_thread[ci],
1273  min_double_per_thread[ci],
1274  max_double_per_thread[ci]);
1275  } else {
1276  set_chunk_stats(col_type,
1277  data_addr,
1278  has_null_per_thread[ci],
1279  min_int64t_per_thread[ci],
1280  max_int64t_per_thread[ci]);
1281  }
1282  }
1283  };
1284 
1285  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1286  size_t nbytes_var_data_to_keep;
1287  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1288 
1289  data_buffer->encoder->setNumElems(nrows_to_keep);
1290  data_buffer->setSize(nbytes_var_data_to_keep);
1291  data_buffer->setUpdated();
1292 
1293  index_array[nrows_to_keep] = data_buffer->size();
1294  index_buffer->setSize(sizeof(*index_array) *
1295  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1296  index_buffer->setUpdated();
1297 
1298  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1299  };
1300 
1301  if (is_varlen) {
1302  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1303  } else {
1304  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1305  }
1306  if (threads.size() >= (size_t)cpu_threads()) {
1307  wait_cleanup_threads(threads);
1308  }
1309  }
1310 
1311  wait_cleanup_threads(threads);
1312 
1313  auto key = std::make_pair(td, &fragment);
1314  updel_roll.numTuples[key] = nrows_to_keep;
1315  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1316  auto chunk = chunks[ci];
1317  auto cd = chunk->get_column_desc();
1318  if (!cd->columnType.is_fixlen_array()) {
1320  fragment,
1321  chunk,
1322  has_null_per_thread[ci],
1323  max_double_per_thread[ci],
1324  min_double_per_thread[ci],
1325  max_int64t_per_thread[ci],
1326  min_int64t_per_thread[ci],
1327  cd->columnType,
1328  updel_roll);
1329  }
1330  }
1331 }
1332 
1333 } // namespace Fragmenter_Namespace
1334 
1336  if (nullptr == catalog) {
1337  return;
1338  }
1339  const auto td = catalog->getMetadataForTable(logicalTableId);
1340  CHECK(td);
1341  // checkpoint all shards regardless, or epoch becomes out of sync
1342  if (td->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
1343  catalog->checkpoint(logicalTableId);
1344  }
1345  // for each dirty fragment
1346  for (auto& cm : chunkMetadata) {
1347  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1348  }
1349  dirtyChunks.clear();
1350  // flush gpu dirty chunks if update was not on gpu
1351  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1352  for (const auto& chunkey : dirtyChunkeys) {
1353  catalog->getDataMgr().deleteChunksWithPrefix(
1355  }
1356  }
1357 }
1358 
1360  if (nullptr == catalog) {
1361  return;
1362  }
1363 
1364  if (is_varlen_update) {
1365  int databaseId = catalog->getCurrentDB().dbId;
1366  int32_t tableEpoch = catalog->getTableEpoch(databaseId, logicalTableId);
1367 
1368  dirtyChunks.clear();
1369  const_cast<Catalog_Namespace::Catalog*>(catalog)->setTableEpoch(
1370  databaseId, logicalTableId, tableEpoch);
1371  } else {
1372  const auto td = catalog->getMetadataForTable(logicalTableId);
1373  CHECK(td);
1374  if (td->persistenceLevel != memoryLevel) {
1375  for (auto dit : dirtyChunks) {
1376  catalog->getDataMgr().free(dit.first->get_buffer());
1377  dit.first->set_buffer(nullptr);
1378  }
1379  }
1380  }
1381 }
bool is_fp() const
Definition: sqltypes.h:481
bool is_boolean() const
Definition: sqltypes.h:484
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:67
std::vector< int > ChunkKey
Definition: types.h:35
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
AbstractBuffer * get_index_buf() const
Definition: Chunk.h:95
void commitUpdate()
const ColumnDescriptor * get_column_desc() const
Definition: Chunk.h:52
const int8_t const int64_t * num_rows
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:141
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:142
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:40
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters())
Definition: Execute.cpp:127
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
void cancelUpdate()
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
virtual int8_t * getMemoryPtr()=0
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:241
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:108
AbstractBuffer * get_buffer() const
Definition: Chunk.h:94
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll) override
int32_t StringOffsetT
Definition: sqltypes.h:912
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
int64_t get_epoch_seconds_from_days(const int64_t days)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
bool is_time() const
Definition: sqltypes.h:483
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:333
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:3061
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1377
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
bool is_null(const T &v, const SQLTypeInfo &t)
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
int32_t ArrayOffsetT
Definition: sqltypes.h:913
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
bool is_integer() const
Definition: sqltypes.h:479
bool g_enable_experimental_string_functions
bool is_timeinterval() const
Definition: sqltypes.h:488
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:328
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
const std::map< int, ChunkMetadata > & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:102
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
bool is_string() const
Definition: sqltypes.h:477
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
static bool is_null(const SQLTypeInfo &type, int8_t *array)
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
specifies the content in-memory of a row in the table metadata table
int8_t * numbersPtr
Definition: sqltypes.h:140
bool is_decimal() const
Definition: sqltypes.h:480
void set_minmax(T &min, T &max, T const val)
int cpu_threads()
Definition: thread_count.h:25
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
std::conditional_t< isCudaCC(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:122
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)