OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
UpdelStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <boost/variant.hpp>
18 #include <boost/variant/get.hpp>
19 #include <limits>
20 #include <mutex>
21 #include <string>
22 #include <vector>
23 
24 #include "Catalog/Catalog.h"
25 #include "DataMgr/DataMgr.h"
28 #include "QueryEngine/Execute.h"
30 #include "Shared/ConfigResolve.h"
31 #include "Shared/DateConverters.h"
32 #include "Shared/Logger.h"
34 #include "Shared/thread_count.h"
36 
38 
39 namespace Fragmenter_Namespace {
40 
41 inline void wait_cleanup_threads(std::vector<std::future<void>>& threads) {
42  for (auto& t : threads) {
43  t.get();
44  }
45  threads.clear();
46 }
47 
49  auto fragment_it = std::find_if(
50  fragmentInfoVec_.begin(), fragmentInfoVec_.end(), [=](const auto& f) -> bool {
51  return f.fragmentId == fragment_id;
52  });
53  CHECK(fragment_it != fragmentInfoVec_.end());
54  return *fragment_it;
55 }
56 
57 inline bool is_integral(const SQLTypeInfo& t) {
58  return t.is_integer() || t.is_boolean() || t.is_time() || t.is_timeinterval();
59 }
60 
62 
64  const std::string& tab_name,
65  const std::string& col_name,
66  const int fragment_id,
67  const std::vector<uint64_t>& frag_offsets,
68  const std::vector<ScalarTargetValue>& rhs_values,
69  const SQLTypeInfo& rhs_type,
70  const Data_Namespace::MemoryLevel memory_level,
71  UpdelRoll& updel_roll) {
72  const auto td = catalog->getMetadataForTable(tab_name);
73  CHECK(td);
74  const auto cd = catalog->getMetadataForColumn(td->tableId, col_name);
75  CHECK(cd);
76  td->fragmenter->updateColumn(catalog,
77  td,
78  cd,
79  fragment_id,
80  frag_offsets,
81  rhs_values,
82  rhs_type,
83  memory_level,
84  updel_roll);
85 }
86 
88  const TableDescriptor* td,
89  const ColumnDescriptor* cd,
90  const int fragment_id,
91  const std::vector<uint64_t>& frag_offsets,
92  const ScalarTargetValue& rhs_value,
93  const SQLTypeInfo& rhs_type,
94  const Data_Namespace::MemoryLevel memory_level,
95  UpdelRoll& updel_roll) {
96  updateColumn(catalog,
97  td,
98  cd,
99  fragment_id,
100  frag_offsets,
101  std::vector<ScalarTargetValue>(1, rhs_value),
102  rhs_type,
103  memory_level,
104  updel_roll);
105 }
106 
107 static int get_chunks(const Catalog_Namespace::Catalog* catalog,
108  const TableDescriptor* td,
109  const FragmentInfo& fragment,
110  const Data_Namespace::MemoryLevel memory_level,
111  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
112  for (int cid = 1, nc = 0; nc < td->nColumns; ++cid) {
113  if (const auto cd = catalog->getMetadataForColumn(td->tableId, cid)) {
114  ++nc;
115  if (!cd->isVirtualCol) {
116  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cid);
117  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
118  ChunkKey chunk_key{
119  catalog->getCurrentDB().dbId, td->tableId, cid, fragment.fragmentId};
120  auto chunk = Chunk_NS::Chunk::getChunk(cd,
121  &catalog->getDataMgr(),
122  chunk_key,
123  memory_level,
124  0,
125  chunk_meta_it->second.numBytes,
126  chunk_meta_it->second.numElements);
127  chunks.push_back(chunk);
128  }
129  }
130  }
131  return chunks.size();
132 }
133 
135  public:
137 
138  virtual void convertToColumnarFormat(size_t row, size_t indexInFragment) = 0;
139 
140  virtual void addDataBlocksToInsertData(
141  Fragmenter_Namespace::InsertData& insertData) = 0;
142 };
143 
144 template <typename BUFFER_DATA_TYPE, typename INSERT_DATA_TYPE>
146  using ColumnDataPtr =
147  std::unique_ptr<INSERT_DATA_TYPE, CheckedMallocDeleter<INSERT_DATA_TYPE>>;
148 
152  const BUFFER_DATA_TYPE* data_buffer_addr_;
153 
154  ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
155  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
156  column_data_ = ColumnDataPtr(reinterpret_cast<INSERT_DATA_TYPE*>(
157  checked_malloc(num_rows * sizeof(INSERT_DATA_TYPE))));
158  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->get_buffer()->getMemoryPtr();
159  }
160 
161  ~ScalarChunkConverter() override {}
162 
163  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
164  auto buffer_value = data_buffer_addr_[indexInFragment];
165  auto insert_value = static_cast<INSERT_DATA_TYPE>(buffer_value);
166  column_data_.get()[row] = insert_value;
167  }
168 
170  DataBlockPtr dataBlock;
171  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
172  insertData.data.push_back(dataBlock);
173  insertData.columnIds.push_back(column_descriptor_->columnId);
174  }
175 };
176 
180 
181  std::unique_ptr<std::vector<ArrayDatum>> column_data_;
184 
186  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
187  column_data_ = std::make_unique<std::vector<ArrayDatum>>(num_rows);
190  }
191 
193 
194  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
195  auto src_value_ptr = data_buffer_addr_ + (indexInFragment * fixed_array_length_);
196  (*column_data_)[row] =
197  ArrayDatum(fixed_array_length_, (int8_t*)src_value_ptr, DoNothingDeleter());
198  }
199 
201  DataBlockPtr dataBlock;
202  dataBlock.arraysPtr = column_data_.get();
203  insertData.data.push_back(dataBlock);
204  insertData.columnIds.push_back(column_descriptor_->columnId);
205  }
206 };
207 
210 
211  ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
212  : FixedLenArrayChunkConverter(num_rows, chunk) {
214  (StringOffsetT*)(chunk->get_index_buf() ? chunk->get_index_buf()->getMemoryPtr()
215  : nullptr);
216  }
217 
218  ~ArrayChunkConverter() override {}
219 
220  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
221  auto startIndex = index_buffer_addr_[indexInFragment];
222  auto endIndex = index_buffer_addr_[indexInFragment + 1];
223  size_t src_value_size = std::abs(endIndex) - std::abs(startIndex);
224  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
225  (*column_data_)[row] = ArrayDatum(
226  src_value_size, (int8_t*)src_value_ptr, endIndex < 0, DoNothingDeleter());
227  }
228 };
229 
233 
234  std::unique_ptr<std::vector<std::string>> column_data_;
235  const int8_t* data_buffer_addr_;
237 
239  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
240  column_data_ = std::make_unique<std::vector<std::string>>(num_rows);
243  (StringOffsetT*)(chunk->get_index_buf() ? chunk->get_index_buf()->getMemoryPtr()
244  : nullptr);
245  }
246 
247  ~StringChunkConverter() override {}
248 
249  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
250  size_t src_value_size =
251  index_buffer_addr_[indexInFragment + 1] - index_buffer_addr_[indexInFragment];
252  auto src_value_ptr = data_buffer_addr_ + index_buffer_addr_[indexInFragment];
253  (*column_data_)[row] = std::string((const char*)src_value_ptr, src_value_size);
254  }
255 
257  DataBlockPtr dataBlock;
258  dataBlock.stringsPtr = column_data_.get();
259  insertData.data.push_back(dataBlock);
260  insertData.columnIds.push_back(column_descriptor_->columnId);
261  }
262 };
263 
264 template <typename BUFFER_DATA_TYPE>
266  using ColumnDataPtr = std::unique_ptr<int64_t, CheckedMallocDeleter<int64_t>>;
267 
271  const BUFFER_DATA_TYPE* data_buffer_addr_;
272 
273  DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk* chunk)
274  : chunk_(chunk), column_descriptor_(chunk->get_column_desc()) {
276  reinterpret_cast<int64_t*>(checked_malloc(num_rows * sizeof(int64_t))));
277  data_buffer_addr_ = (BUFFER_DATA_TYPE*)chunk->get_buffer()->getMemoryPtr();
278  }
279 
280  ~DateChunkConverter() override {}
281 
282  void convertToColumnarFormat(size_t row, size_t indexInFragment) override {
283  auto buffer_value = data_buffer_addr_[indexInFragment];
284  auto insert_value = static_cast<int64_t>(buffer_value);
286  }
287 
289  DataBlockPtr dataBlock;
290  dataBlock.numbersPtr = reinterpret_cast<int8_t*>(column_data_.get());
291  insertData.data.push_back(dataBlock);
292  insertData.columnIds.push_back(column_descriptor_->columnId);
293  }
294 };
295 
297  const Catalog_Namespace::Catalog* catalog,
298  const TableDescriptor* td,
299  const int fragmentId,
300  const std::vector<TargetMetaInfo> sourceMetaInfo,
301  const std::vector<const ColumnDescriptor*> columnDescriptors,
302  const RowDataProvider& sourceDataProvider,
303  const size_t indexOffFragmentOffsetColumn,
304  const Data_Namespace::MemoryLevel memoryLevel,
305  UpdelRoll& updelRoll) {
306  if (!is_feature_enabled<VarlenUpdates>()) {
307  throw std::runtime_error("varlen UPDATE path not enabled.");
308  }
309 
310  updelRoll.is_varlen_update = true;
311  updelRoll.catalog = catalog;
312  updelRoll.logicalTableId = catalog->getLogicalTableId(td->tableId);
313  updelRoll.memoryLevel = memoryLevel;
314 
315  size_t num_entries = sourceDataProvider.getEntryCount();
316  size_t num_rows = sourceDataProvider.getRowCount();
317 
318  if (0 == num_rows) {
319  // bail out early
320  return;
321  }
322 
324 
325  auto& fragment = getFragmentInfoFromId(fragmentId);
326  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
327  get_chunks(catalog, td, fragment, memoryLevel, chunks);
328  std::vector<std::unique_ptr<TargetValueConverter>> sourceDataConverters(
329  columnDescriptors.size());
330  std::vector<std::unique_ptr<ChunkToInsertDataConverter>> chunkConverters;
331  std::shared_ptr<Executor> executor;
332 
334  executor = Executor::getExecutor(catalog->getCurrentDB().dbId);
335  }
336 
337  std::shared_ptr<Chunk_NS::Chunk> deletedChunk;
338  for (size_t indexOfChunk = 0; indexOfChunk < chunks.size(); indexOfChunk++) {
339  auto chunk = chunks[indexOfChunk];
340  const auto chunk_cd = chunk->get_column_desc();
341 
342  if (chunk_cd->isDeletedCol) {
343  deletedChunk = chunk;
344  continue;
345  }
346 
347  auto targetColumnIt = std::find_if(columnDescriptors.begin(),
348  columnDescriptors.end(),
349  [=](const ColumnDescriptor* cd) -> bool {
350  return cd->columnId == chunk_cd->columnId;
351  });
352 
353  if (targetColumnIt != columnDescriptors.end()) {
354  auto indexOfTargetColumn = std::distance(columnDescriptors.begin(), targetColumnIt);
355 
356  auto sourceDataMetaInfo = sourceMetaInfo[indexOfTargetColumn];
357  auto targetDescriptor = columnDescriptors[indexOfTargetColumn];
358 
360  num_rows,
361  *catalog,
362  sourceDataMetaInfo,
363  targetDescriptor,
364  targetDescriptor->columnType,
365  !targetDescriptor->columnType.get_notnull(),
366  sourceDataProvider.getLiteralDictionary(),
368  ? executor->getStringDictionaryProxy(
369  sourceDataMetaInfo.get_type_info().get_comp_param(),
370  executor->getRowSetMemoryOwner(),
371  true)
372  : nullptr};
373  auto converter = factory.create(param);
374  sourceDataConverters[indexOfTargetColumn] = std::move(converter);
375 
376  if (targetDescriptor->columnType.is_geometry()) {
377  // geometry columns are composites
378  // need to skip chunks, depending on geo type
379  switch (targetDescriptor->columnType.get_type()) {
380  case kMULTIPOLYGON:
381  indexOfChunk += 5;
382  break;
383  case kPOLYGON:
384  indexOfChunk += 4;
385  break;
386  case kLINESTRING:
387  indexOfChunk += 2;
388  break;
389  case kPOINT:
390  indexOfChunk += 1;
391  break;
392  default:
393  CHECK(false); // not supported
394  }
395  }
396  } else {
397  if (chunk_cd->columnType.is_varlen() || chunk_cd->columnType.is_fixlen_array()) {
398  std::unique_ptr<ChunkToInsertDataConverter> converter;
399 
400  if (chunk_cd->columnType.is_fixlen_array()) {
401  converter =
402  std::make_unique<FixedLenArrayChunkConverter>(num_rows, chunk.get());
403  } else if (chunk_cd->columnType.is_string()) {
404  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
405  } else if (chunk_cd->columnType.is_geometry()) {
406  // the logical geo column is a string column
407  converter = std::make_unique<StringChunkConverter>(num_rows, chunk.get());
408  } else {
409  converter = std::make_unique<ArrayChunkConverter>(num_rows, chunk.get());
410  }
411 
412  chunkConverters.push_back(std::move(converter));
413 
414  } else if (chunk_cd->columnType.is_date_in_days()) {
415  /* Q: Why do we need this?
416  A: In variable length updates path we move the chunk content of column
417  without decoding. Since it again passes through DateDaysEncoder
418  the expected value should be in seconds, but here it will be in days.
419  Therefore, using DateChunkConverter chunk values are being scaled to
420  seconds which then ultimately encoded in days in DateDaysEncoder.
421  */
422  std::unique_ptr<ChunkToInsertDataConverter> converter;
423  const size_t physical_size = chunk_cd->columnType.get_size();
424  if (physical_size == 2) {
425  converter =
426  std::make_unique<DateChunkConverter<int16_t>>(num_rows, chunk.get());
427  } else if (physical_size == 4) {
428  converter =
429  std::make_unique<DateChunkConverter<int32_t>>(num_rows, chunk.get());
430  } else {
431  CHECK(false);
432  }
433  chunkConverters.push_back(std::move(converter));
434  } else {
435  std::unique_ptr<ChunkToInsertDataConverter> converter;
436  SQLTypeInfo logical_type = get_logical_type_info(chunk_cd->columnType);
437  int logical_size = logical_type.get_size();
438  int physical_size = chunk_cd->columnType.get_size();
439 
440  if (logical_type.is_string()) {
441  // for dicts -> logical = physical
442  logical_size = physical_size;
443  }
444 
445  if (8 == physical_size) {
446  converter = std::make_unique<ScalarChunkConverter<int64_t, int64_t>>(
447  num_rows, chunk.get());
448  } else if (4 == physical_size) {
449  if (8 == logical_size) {
450  converter = std::make_unique<ScalarChunkConverter<int32_t, int64_t>>(
451  num_rows, chunk.get());
452  } else {
453  converter = std::make_unique<ScalarChunkConverter<int32_t, int32_t>>(
454  num_rows, chunk.get());
455  }
456  } else if (2 == chunk_cd->columnType.get_size()) {
457  if (8 == logical_size) {
458  converter = std::make_unique<ScalarChunkConverter<int16_t, int64_t>>(
459  num_rows, chunk.get());
460  } else if (4 == logical_size) {
461  converter = std::make_unique<ScalarChunkConverter<int16_t, int32_t>>(
462  num_rows, chunk.get());
463  } else {
464  converter = std::make_unique<ScalarChunkConverter<int16_t, int16_t>>(
465  num_rows, chunk.get());
466  }
467  } else if (1 == chunk_cd->columnType.get_size()) {
468  if (8 == logical_size) {
469  converter = std::make_unique<ScalarChunkConverter<int8_t, int64_t>>(
470  num_rows, chunk.get());
471  } else if (4 == logical_size) {
472  converter = std::make_unique<ScalarChunkConverter<int8_t, int32_t>>(
473  num_rows, chunk.get());
474  } else if (2 == logical_size) {
475  converter = std::make_unique<ScalarChunkConverter<int8_t, int16_t>>(
476  num_rows, chunk.get());
477  } else {
478  converter = std::make_unique<ScalarChunkConverter<int8_t, int8_t>>(
479  num_rows, chunk.get());
480  }
481  } else {
482  CHECK(false); // unknown
483  }
484 
485  chunkConverters.push_back(std::move(converter));
486  }
487  }
488  }
489 
490  static boost_variant_accessor<ScalarTargetValue> SCALAR_TARGET_VALUE_ACCESSOR;
491  static boost_variant_accessor<int64_t> OFFSET_VALUE__ACCESSOR;
492 
493  updelRoll.dirtyChunks[deletedChunk.get()] = deletedChunk;
494  ChunkKey chunkey{updelRoll.catalog->getCurrentDB().dbId,
495  deletedChunk->get_column_desc()->tableId,
496  deletedChunk->get_column_desc()->columnId,
497  fragment.fragmentId};
498  updelRoll.dirtyChunkeys.insert(chunkey);
499  bool* deletedChunkBuffer =
500  reinterpret_cast<bool*>(deletedChunk->get_buffer()->getMemoryPtr());
501 
502  std::atomic<size_t> row_idx{0};
503 
504  auto row_converter = [&sourceDataProvider,
505  &sourceDataConverters,
506  &indexOffFragmentOffsetColumn,
507  &chunkConverters,
508  &deletedChunkBuffer,
509  &row_idx](size_t indexOfEntry) -> void {
510  // convert the source data
511  const auto row = sourceDataProvider.getEntryAt(indexOfEntry);
512  if (row.empty()) {
513  return;
514  }
515 
516  size_t indexOfRow = row_idx.fetch_add(1);
517 
518  for (size_t col = 0; col < sourceDataConverters.size(); col++) {
519  if (sourceDataConverters[col]) {
520  const auto& mapd_variant = row[col];
521  sourceDataConverters[col]->convertToColumnarFormat(indexOfRow, &mapd_variant);
522  }
523  }
524 
525  auto scalar = checked_get(
526  indexOfRow, &row[indexOffFragmentOffsetColumn], SCALAR_TARGET_VALUE_ACCESSOR);
527  auto indexInChunkBuffer = *checked_get(indexOfRow, scalar, OFFSET_VALUE__ACCESSOR);
528 
529  // convert the remaining chunks
530  for (size_t idx = 0; idx < chunkConverters.size(); idx++) {
531  chunkConverters[idx]->convertToColumnarFormat(indexOfRow, indexInChunkBuffer);
532  }
533 
534  // now mark the row as deleted
535  deletedChunkBuffer[indexInChunkBuffer] = true;
536  };
537 
538  bool can_go_parallel = num_rows > 20000;
539 
540  if (can_go_parallel) {
541  const size_t num_worker_threads = cpu_threads();
542  std::vector<std::future<void>> worker_threads;
543  for (size_t i = 0,
544  start_entry = 0,
545  stride = (num_entries + num_worker_threads - 1) / num_worker_threads;
546  i < num_worker_threads && start_entry < num_entries;
547  ++i, start_entry += stride) {
548  const auto end_entry = std::min(start_entry + stride, num_rows);
549  worker_threads.push_back(std::async(
550  std::launch::async,
551  [&row_converter](const size_t start, const size_t end) {
552  for (size_t indexOfRow = start; indexOfRow < end; ++indexOfRow) {
553  row_converter(indexOfRow);
554  }
555  },
556  start_entry,
557  end_entry));
558  }
559 
560  for (auto& child : worker_threads) {
561  child.wait();
562  }
563 
564  } else {
565  for (size_t entryIdx = 0; entryIdx < num_entries; entryIdx++) {
566  row_converter(entryIdx);
567  }
568  }
569 
571  insert_data.databaseId = catalog->getCurrentDB().dbId;
572  insert_data.tableId = td->tableId;
573 
574  for (size_t i = 0; i < chunkConverters.size(); i++) {
575  chunkConverters[i]->addDataBlocksToInsertData(insert_data);
576  continue;
577  }
578 
579  for (size_t i = 0; i < sourceDataConverters.size(); i++) {
580  if (sourceDataConverters[i]) {
581  sourceDataConverters[i]->addDataBlocksToInsertData(insert_data);
582  }
583  continue;
584  }
585 
586  insert_data.numRows = num_rows;
587  insertDataNoCheckpoint(insert_data);
588 
589  // update metdata
590  if (!deletedChunk->get_buffer()->has_encoder) {
591  deletedChunk->init_encoder();
592  }
593  deletedChunk->get_buffer()->encoder->updateStats(static_cast<int64_t>(true), false);
594 
595  auto& shadowDeletedChunkMeta =
596  fragment.shadowChunkMetadataMap[deletedChunk->get_column_desc()->columnId];
597  if (shadowDeletedChunkMeta.numElements >
598  deletedChunk->get_buffer()->encoder->getNumElems()) {
599  // the append will have populated shadow meta data, otherwise use existing num
600  // elements
601  deletedChunk->get_buffer()->encoder->setNumElems(shadowDeletedChunkMeta.numElements);
602  }
603  deletedChunk->get_buffer()->setUpdated();
604 }
605 
607  const TableDescriptor* td,
608  const ColumnDescriptor* cd,
609  const int fragment_id,
610  const std::vector<uint64_t>& frag_offsets,
611  const std::vector<ScalarTargetValue>& rhs_values,
612  const SQLTypeInfo& rhs_type,
613  const Data_Namespace::MemoryLevel memory_level,
614  UpdelRoll& updel_roll) {
615  updel_roll.catalog = catalog;
616  updel_roll.logicalTableId = catalog->getLogicalTableId(td->tableId);
617  updel_roll.memoryLevel = memory_level;
618 
619  const size_t ncore = cpu_threads();
620  const auto nrow = frag_offsets.size();
621  const auto n_rhs_values = rhs_values.size();
622  if (0 == nrow) {
623  return;
624  }
625  CHECK(nrow == n_rhs_values || 1 == n_rhs_values);
626 
627  auto& fragment = getFragmentInfoFromId(fragment_id);
628  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(cd->columnId);
629  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
630  ChunkKey chunk_key{
631  catalog->getCurrentDB().dbId, td->tableId, cd->columnId, fragment.fragmentId};
632  auto chunk = Chunk_NS::Chunk::getChunk(cd,
633  &catalog->getDataMgr(),
634  chunk_key,
636  0,
637  chunk_meta_it->second.numBytes,
638  chunk_meta_it->second.numElements);
639 
640  std::vector<int8_t> has_null_per_thread(ncore, 0);
641  std::vector<double> max_double_per_thread(ncore, std::numeric_limits<double>::lowest());
642  std::vector<double> min_double_per_thread(ncore, std::numeric_limits<double>::max());
643  std::vector<int64_t> max_int64t_per_thread(ncore, std::numeric_limits<int64_t>::min());
644  std::vector<int64_t> min_int64t_per_thread(ncore, std::numeric_limits<int64_t>::max());
645 
646  // parallel update elements
647  std::vector<std::future<void>> threads;
648 
649  const auto segsz = (nrow + ncore - 1) / ncore;
650  auto dbuf = chunk->get_buffer();
651  auto dbuf_addr = dbuf->getMemoryPtr();
652  dbuf->setUpdated();
653  {
654  std::lock_guard<std::mutex> lck(updel_roll.mutex);
655  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
656  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
657  }
658 
659  ChunkKey chunkey{updel_roll.catalog->getCurrentDB().dbId,
660  cd->tableId,
661  cd->columnId,
662  fragment.fragmentId};
663  updel_roll.dirtyChunkeys.insert(chunkey);
664  }
665  for (size_t rbegin = 0, c = 0; rbegin < nrow; ++c, rbegin += segsz) {
666  threads.emplace_back(std::async(
667  std::launch::async,
668  [=,
669  &has_null_per_thread,
670  &min_int64t_per_thread,
671  &max_int64t_per_thread,
672  &min_double_per_thread,
673  &max_double_per_thread,
674  &frag_offsets,
675  &rhs_values] {
676  SQLTypeInfo lhs_type = cd->columnType;
677 
678  // !! not sure if this is a undocumented convention or a bug, but for a sharded
679  // table the dictionary id of a encoded string column is not specified by
680  // comp_param in physical table but somehow in logical table :) comp_param in
681  // physical table is always 0, so need to adapt accordingly...
682  auto cdl = (shard_ < 0)
683  ? cd
684  : catalog->getMetadataForColumn(
685  catalog->getLogicalTableId(td->tableId), cd->columnId);
686  CHECK(cdl);
687  DecimalOverflowValidator decimalOverflowValidator(lhs_type);
688  NullAwareValidator<DecimalOverflowValidator> nullAwareDecimalOverflowValidator(
689  lhs_type, &decimalOverflowValidator);
690  DateDaysOverflowValidator dateDaysOverflowValidator(lhs_type);
691  NullAwareValidator<DateDaysOverflowValidator> nullAwareDateOverflowValidator(
692  lhs_type, &dateDaysOverflowValidator);
693 
694  StringDictionary* stringDict{nullptr};
695  if (lhs_type.is_string()) {
696  CHECK(kENCODING_DICT == lhs_type.get_compression());
697  auto dictDesc = const_cast<DictDescriptor*>(
698  catalog->getMetadataForDict(cdl->columnType.get_comp_param()));
699  CHECK(dictDesc);
700  stringDict = dictDesc->stringDict.get();
701  CHECK(stringDict);
702  }
703 
704  for (size_t r = rbegin; r < std::min(rbegin + segsz, nrow); r++) {
705  const auto roffs = frag_offsets[r];
706  auto data_ptr = dbuf_addr + roffs * get_element_size(lhs_type);
707  auto sv = &rhs_values[1 == n_rhs_values ? 0 : r];
708  ScalarTargetValue sv2;
709 
710  // Subtle here is on the two cases of string-to-string assignments, when
711  // upstream passes RHS string as a string index instead of a preferred "real
712  // string".
713  // case #1. For "SET str_col = str_literal", it is hard to resolve temp str
714  // index
715  // in this layer, so if upstream passes a str idx here, an
716  // exception is thrown.
717  // case #2. For "SET str_col1 = str_col2", RHS str idx is converted to LHS
718  // str idx.
719  if (rhs_type.is_string()) {
720  if (const auto vp = boost::get<int64_t>(sv)) {
721  auto dictDesc = const_cast<DictDescriptor*>(
722  catalog->getMetadataForDict(rhs_type.get_comp_param()));
723  if (nullptr == dictDesc) {
724  throw std::runtime_error(
725  "UPDATE does not support cast from string literal to string "
726  "column.");
727  }
728  auto stringDict = dictDesc->stringDict.get();
729  CHECK(stringDict);
730  sv2 = NullableString(stringDict->getString(*vp));
731  sv = &sv2;
732  }
733  }
734 
735  if (const auto vp = boost::get<int64_t>(sv)) {
736  auto v = *vp;
737  if (lhs_type.is_string()) {
738  throw std::runtime_error("UPDATE does not support cast to string.");
739  }
740  put_scalar<int64_t>(data_ptr, lhs_type, v, cd->columnName, &rhs_type);
741  if (lhs_type.is_decimal()) {
742  nullAwareDecimalOverflowValidator.validate<int64_t>(v);
743  int64_t decimal_val;
744  get_scalar<int64_t>(data_ptr, lhs_type, decimal_val);
745  tabulate_metadata(lhs_type,
746  min_int64t_per_thread[c],
747  max_int64t_per_thread[c],
748  has_null_per_thread[c],
749  (v == inline_int_null_value<int64_t>() &&
750  lhs_type.get_notnull() == false)
751  ? v
752  : decimal_val);
753  auto const positive_v_and_negative_d = (v >= 0) && (decimal_val < 0);
754  auto const negative_v_and_positive_d = (v < 0) && (decimal_val >= 0);
755  if (positive_v_and_negative_d || negative_v_and_positive_d) {
756  throw std::runtime_error(
757  "Data conversion overflow on " + std::to_string(v) +
758  " from DECIMAL(" + std::to_string(rhs_type.get_dimension()) + ", " +
759  std::to_string(rhs_type.get_scale()) + ") to (" +
760  std::to_string(lhs_type.get_dimension()) + ", " +
761  std::to_string(lhs_type.get_scale()) + ")");
762  }
763  } else if (is_integral(lhs_type)) {
764  if (lhs_type.is_date_in_days()) {
765  // Store meta values in seconds
766  if (lhs_type.get_size() == 2) {
767  nullAwareDateOverflowValidator.validate<int16_t>(v);
768  } else {
769  nullAwareDateOverflowValidator.validate<int32_t>(v);
770  }
771  int64_t days;
772  get_scalar<int64_t>(data_ptr, lhs_type, days);
773  const auto seconds = DateConverters::get_epoch_seconds_from_days(days);
774  tabulate_metadata(lhs_type,
775  min_int64t_per_thread[c],
776  max_int64t_per_thread[c],
777  has_null_per_thread[c],
778  (v == inline_int_null_value<int64_t>() &&
779  lhs_type.get_notnull() == false)
780  ? NullSentinelSupplier()(lhs_type, v)
781  : seconds);
782  } else {
783  int64_t target_value;
784  if (rhs_type.is_decimal()) {
785  target_value = round(decimal_to_double(rhs_type, v));
786  } else {
787  target_value = v;
788  }
789  tabulate_metadata(lhs_type,
790  min_int64t_per_thread[c],
791  max_int64t_per_thread[c],
792  has_null_per_thread[c],
793  target_value);
794  }
795  } else {
797  lhs_type,
798  min_double_per_thread[c],
799  max_double_per_thread[c],
800  has_null_per_thread[c],
801  rhs_type.is_decimal() ? decimal_to_double(rhs_type, v) : v);
802  }
803  } else if (const auto vp = boost::get<double>(sv)) {
804  auto v = *vp;
805  if (lhs_type.is_string()) {
806  throw std::runtime_error("UPDATE does not support cast to string.");
807  }
808  put_scalar<double>(data_ptr, lhs_type, v, cd->columnName);
809  if (lhs_type.is_integer()) {
810  tabulate_metadata(lhs_type,
811  min_int64t_per_thread[c],
812  max_int64t_per_thread[c],
813  has_null_per_thread[c],
814  int64_t(v));
815  } else if (lhs_type.is_fp()) {
816  tabulate_metadata(lhs_type,
817  min_double_per_thread[c],
818  max_double_per_thread[c],
819  has_null_per_thread[c],
820  double(v));
821  } else {
822  UNREACHABLE() << "Unexpected combination of a non-floating or integer "
823  "LHS with a floating RHS.";
824  }
825  } else if (const auto vp = boost::get<float>(sv)) {
826  auto v = *vp;
827  if (lhs_type.is_string()) {
828  throw std::runtime_error("UPDATE does not support cast to string.");
829  }
830  put_scalar<float>(data_ptr, lhs_type, v, cd->columnName);
831  if (lhs_type.is_integer()) {
832  tabulate_metadata(lhs_type,
833  min_int64t_per_thread[c],
834  max_int64t_per_thread[c],
835  has_null_per_thread[c],
836  int64_t(v));
837  } else {
838  tabulate_metadata(lhs_type,
839  min_double_per_thread[c],
840  max_double_per_thread[c],
841  has_null_per_thread[c],
842  double(v));
843  }
844  } else if (const auto vp = boost::get<NullableString>(sv)) {
845  const auto s = boost::get<std::string>(vp);
846  const auto sval = s ? *s : std::string("");
847  if (lhs_type.is_string()) {
848  decltype(stringDict->getOrAdd(sval)) sidx;
849  {
850  std::unique_lock<std::mutex> lock(temp_mutex_);
851  sidx = stringDict->getOrAdd(sval);
852  }
853  put_scalar<int64_t>(data_ptr, lhs_type, sidx, cd->columnName);
854  tabulate_metadata(lhs_type,
855  min_int64t_per_thread[c],
856  max_int64t_per_thread[c],
857  has_null_per_thread[c],
858  int64_t(sidx));
859  } else if (sval.size() > 0) {
860  auto dval = std::atof(sval.data());
861  if (lhs_type.is_boolean()) {
862  dval = sval == "t" || sval == "true" || sval == "T" || sval == "True";
863  } else if (lhs_type.is_time()) {
864  throw std::runtime_error(
865  "Date/Time/Timestamp update not supported through translated "
866  "string path.");
867  }
868  if (lhs_type.is_fp() || lhs_type.is_decimal()) {
869  put_scalar<double>(data_ptr, lhs_type, dval, cd->columnName);
870  tabulate_metadata(lhs_type,
871  min_double_per_thread[c],
872  max_double_per_thread[c],
873  has_null_per_thread[c],
874  double(dval));
875  } else {
876  put_scalar<int64_t>(data_ptr, lhs_type, dval, cd->columnName);
877  tabulate_metadata(lhs_type,
878  min_int64t_per_thread[c],
879  max_int64t_per_thread[c],
880  has_null_per_thread[c],
881  int64_t(dval));
882  }
883  } else {
884  put_null(data_ptr, lhs_type, cd->columnName);
885  has_null_per_thread[c] = true;
886  }
887  } else {
888  CHECK(false);
889  }
890  }
891  }));
892  if (threads.size() >= (size_t)cpu_threads()) {
893  wait_cleanup_threads(threads);
894  }
895  }
896  wait_cleanup_threads(threads);
897 
898  // for unit test
900  if (cd->isDeletedCol) {
901  const auto deleted_offsets = getVacuumOffsets(chunk);
902  if (deleted_offsets.size() > 0) {
903  compactRows(catalog, td, fragment_id, deleted_offsets, memory_level, updel_roll);
904  return;
905  }
906  }
907  }
908  bool has_null_per_chunk{false};
909  double max_double_per_chunk{std::numeric_limits<double>::lowest()};
910  double min_double_per_chunk{std::numeric_limits<double>::max()};
911  int64_t max_int64t_per_chunk{std::numeric_limits<int64_t>::min()};
912  int64_t min_int64t_per_chunk{std::numeric_limits<int64_t>::max()};
913  for (size_t c = 0; c < ncore; ++c) {
914  has_null_per_chunk = has_null_per_chunk || has_null_per_thread[c];
915  max_double_per_chunk =
916  std::max<double>(max_double_per_chunk, max_double_per_thread[c]);
917  min_double_per_chunk =
918  std::min<double>(min_double_per_chunk, min_double_per_thread[c]);
919  max_int64t_per_chunk =
920  std::max<int64_t>(max_int64t_per_chunk, max_int64t_per_thread[c]);
921  min_int64t_per_chunk =
922  std::min<int64_t>(min_int64t_per_chunk, min_int64t_per_thread[c]);
923  }
925  fragment,
926  chunk,
927  has_null_per_chunk,
928  max_double_per_chunk,
929  min_double_per_chunk,
930  max_int64t_per_chunk,
931  min_int64t_per_chunk,
932  cd->columnType,
933  updel_roll);
934 }
935 
937  FragmentInfo& fragment,
938  std::shared_ptr<Chunk_NS::Chunk> chunk,
939  const bool has_null_per_chunk,
940  const double max_double_per_chunk,
941  const double min_double_per_chunk,
942  const int64_t max_int64t_per_chunk,
943  const int64_t min_int64t_per_chunk,
944  const SQLTypeInfo& rhs_type,
945  UpdelRoll& updel_roll) {
946  auto td = updel_roll.catalog->getMetadataForTable(cd->tableId);
947  auto key = std::make_pair(td, &fragment);
948  std::lock_guard<std::mutex> lck(updel_roll.mutex);
949  if (0 == updel_roll.chunkMetadata.count(key)) {
950  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
951  }
952  if (0 == updel_roll.numTuples.count(key)) {
953  updel_roll.numTuples[key] = fragment.shadowNumTuples;
954  }
955  auto& chunkMetadata = updel_roll.chunkMetadata[key];
956 
957  auto buffer = chunk->get_buffer();
958  const auto& lhs_type = cd->columnType;
959 
960  auto update_stats = [& encoder = buffer->encoder](auto min, auto max, auto has_null) {
961  static_assert(std::is_same<decltype(min), decltype(max)>::value,
962  "Type mismatch on min/max");
963  if (has_null) {
964  encoder->updateStats(decltype(min)(), true);
965  }
966  if (max < min) {
967  return;
968  }
969  encoder->updateStats(min, false);
970  encoder->updateStats(max, false);
971  };
972 
973  if (is_integral(lhs_type) || (lhs_type.is_decimal() && rhs_type.is_decimal())) {
974  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
975  } else if (lhs_type.is_fp()) {
976  update_stats(min_double_per_chunk, max_double_per_chunk, has_null_per_chunk);
977  } else if (lhs_type.is_decimal()) {
978  update_stats((int64_t)(min_double_per_chunk * pow(10, lhs_type.get_scale())),
979  (int64_t)(max_double_per_chunk * pow(10, lhs_type.get_scale())),
980  has_null_per_chunk);
981  } else if (!lhs_type.is_array() && !lhs_type.is_geometry() &&
982  !(lhs_type.is_string() && kENCODING_DICT != lhs_type.get_compression())) {
983  update_stats(min_int64t_per_chunk, max_int64t_per_chunk, has_null_per_chunk);
984  }
985  buffer->encoder->getMetadata(chunkMetadata[cd->columnId]);
986 
987  // removed as @alex suggests. keep it commented in case of any chance to revisit
988  // it once after vacuum code is introduced. fragment.invalidateChunkMetadataMap();
989 }
990 
992  const MetaDataKey& key,
993  UpdelRoll& updel_roll) {
994  mapd_unique_lock<mapd_shared_mutex> writeLock(fragmentInfoMutex_);
995  if (updel_roll.chunkMetadata.count(key)) {
996  auto& fragmentInfo = *key.second;
997  const auto& chunkMetadata = updel_roll.chunkMetadata[key];
998  fragmentInfo.shadowChunkMetadataMap = chunkMetadata;
999  fragmentInfo.setChunkMetadataMap(chunkMetadata);
1000  fragmentInfo.shadowNumTuples = updel_roll.numTuples[key];
1001  fragmentInfo.setPhysicalNumTuples(fragmentInfo.shadowNumTuples);
1002  // TODO(ppan): When fragment-level compaction is enable, the following code
1003  // should suffice. When not (ie. existing code), we'll revert to update
1004  // InsertOrderFragmenter::varLenColInfo_
1005  /*
1006  for (const auto cit : chunkMetadata) {
1007  const auto& cd = *catalog->getMetadataForColumn(td->tableId, cit.first);
1008  if (cd.columnType.get_size() < 0)
1009  fragmentInfo.varLenColInfox[cd.columnId] = cit.second.numBytes;
1010  }
1011  */
1012  }
1013 }
1014 
1016  const TableDescriptor* td,
1017  const FragmentInfo& fragment,
1018  const Data_Namespace::MemoryLevel memory_level) {
1019  std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks;
1020  // coming from updateColumn (on '$delete$' column) we dont have chunks for all columns
1021  for (int col_id = 1, ncol = 0; ncol < td->nColumns; ++col_id) {
1022  if (const auto cd = catalog_->getMetadataForColumn(td->tableId, col_id)) {
1023  ++ncol;
1024  if (!cd->isVirtualCol) {
1025  auto chunk_meta_it = fragment.getChunkMetadataMapPhysical().find(col_id);
1026  CHECK(chunk_meta_it != fragment.getChunkMetadataMapPhysical().end());
1027  ChunkKey chunk_key{
1028  catalog_->getCurrentDB().dbId, td->tableId, col_id, fragment.fragmentId};
1029  auto chunk = Chunk_NS::Chunk::getChunk(cd,
1030  &catalog_->getDataMgr(),
1031  chunk_key,
1032  memory_level,
1033  0,
1034  chunk_meta_it->second.numBytes,
1035  chunk_meta_it->second.numElements);
1036  chunks.push_back(chunk);
1037  }
1038  }
1039  }
1040  return chunks;
1041 }
1042 
1043 // get a sorted vector of offsets of rows to vacuum
1044 const std::vector<uint64_t> InsertOrderFragmenter::getVacuumOffsets(
1045  const std::shared_ptr<Chunk_NS::Chunk>& chunk) {
1046  const auto data_buffer = chunk->get_buffer();
1047  const auto data_addr = data_buffer->getMemoryPtr();
1048  const size_t nrows_in_chunk = data_buffer->size();
1049  const size_t ncore = cpu_threads();
1050  const size_t segsz = (nrows_in_chunk + ncore - 1) / ncore;
1051  std::vector<std::vector<uint64_t>> deleted_offsets;
1052  deleted_offsets.resize(ncore);
1053  std::vector<std::future<void>> threads;
1054  for (size_t rbegin = 0; rbegin < nrows_in_chunk; rbegin += segsz) {
1055  threads.emplace_back(std::async(std::launch::async, [=, &deleted_offsets] {
1056  const auto rend = std::min<size_t>(rbegin + segsz, nrows_in_chunk);
1057  const auto ithread = rbegin / segsz;
1058  CHECK(ithread < deleted_offsets.size());
1059  deleted_offsets[ithread].reserve(segsz);
1060  for (size_t r = rbegin; r < rend; ++r) {
1061  if (data_addr[r]) {
1062  deleted_offsets[ithread].push_back(r);
1063  }
1064  }
1065  }));
1066  }
1067  wait_cleanup_threads(threads);
1068  std::vector<uint64_t> all_deleted_offsets;
1069  for (size_t i = 0; i < ncore; ++i) {
1070  all_deleted_offsets.insert(
1071  all_deleted_offsets.end(), deleted_offsets[i].begin(), deleted_offsets[i].end());
1072  }
1073  return all_deleted_offsets;
1074 }
1075 
1076 template <typename T>
1077 static void set_chunk_stats(const SQLTypeInfo& col_type,
1078  int8_t* data_addr,
1079  int8_t& has_null,
1080  T& min,
1081  T& max) {
1082  T v;
1083  const auto can_be_null = !col_type.get_notnull();
1084  const auto is_null = get_scalar<T>(data_addr, col_type, v);
1085  if (is_null) {
1086  has_null = has_null || (can_be_null && is_null);
1087  } else {
1088  set_minmax(min, max, v);
1089  }
1090 }
1091 
1093  FragmentInfo& fragment,
1094  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1095  const size_t nrows_to_keep,
1096  UpdelRoll& updel_roll) {
1097  auto cd = chunk->get_column_desc();
1098  auto td = catalog->getMetadataForTable(cd->tableId);
1099  auto data_buffer = chunk->get_buffer();
1100  std::lock_guard<std::mutex> lck(updel_roll.mutex);
1101  const auto key = std::make_pair(td, &fragment);
1102  if (0 == updel_roll.chunkMetadata.count(key)) {
1103  updel_roll.chunkMetadata[key] = fragment.getChunkMetadataMapPhysical();
1104  }
1105  auto& chunkMetadata = updel_roll.chunkMetadata[key];
1106  chunkMetadata[cd->columnId].numElements = nrows_to_keep;
1107  chunkMetadata[cd->columnId].numBytes = data_buffer->size();
1108  if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
1109  updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
1110  }
1111 }
1112 
1114  const FragmentInfo& fragment,
1115  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1116  const std::vector<uint64_t>& frag_offsets) {
1117  const auto cd = chunk->get_column_desc();
1118  const auto& col_type = cd->columnType;
1119  auto data_buffer = chunk->get_buffer();
1120  auto data_addr = data_buffer->getMemoryPtr();
1121  auto element_size =
1122  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1123  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1124  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1125  size_t nbytes_fix_data_to_keep = 0;
1126  auto nrows_to_vacuum = frag_offsets.size();
1127  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1128  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1129  auto is_last_one = irow == nrows_to_vacuum;
1130  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1131  auto maddr_to_vacuum = data_addr;
1132  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1133  if (nrows_to_keep > 0) {
1134  auto nbytes_to_keep = nrows_to_keep * element_size;
1135  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1136  // move curr fixlen row block toward front
1137  memmove(maddr_to_vacuum + irow_of_blk_to_fill * element_size,
1138  maddr_to_vacuum + irow_of_blk_to_keep * element_size,
1139  nbytes_to_keep);
1140  }
1141  irow_of_blk_to_fill += nrows_to_keep;
1142  nbytes_fix_data_to_keep += nbytes_to_keep;
1143  }
1144  irow_of_blk_to_keep = irow_to_vacuum + 1;
1145  }
1146  return nbytes_fix_data_to_keep;
1147 }
1148 
1150  const FragmentInfo& fragment,
1151  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
1152  const std::vector<uint64_t>& frag_offsets) {
1153  auto data_buffer = chunk->get_buffer();
1154  auto index_buffer = chunk->get_index_buf();
1155  auto data_addr = data_buffer->getMemoryPtr();
1156  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1157  auto index_array = (StringOffsetT*)indices_addr;
1158  int64_t irow_of_blk_to_keep = 0; // head of next row block to keep
1159  int64_t irow_of_blk_to_fill = 0; // row offset to fit the kept block
1160  size_t nbytes_fix_data_to_keep = 0;
1161  size_t nbytes_var_data_to_keep = 0;
1162  auto nrows_to_vacuum = frag_offsets.size();
1163  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1164  for (size_t irow = 0; irow <= nrows_to_vacuum; irow++) {
1165  auto is_last_one = irow == nrows_to_vacuum;
1166  auto irow_to_vacuum = is_last_one ? nrows_in_fragment : frag_offsets[irow];
1167  auto maddr_to_vacuum = data_addr;
1168  int64_t nrows_to_keep = irow_to_vacuum - irow_of_blk_to_keep;
1169  if (nrows_to_keep > 0) {
1170  auto ibyte_var_data_to_keep = nbytes_var_data_to_keep;
1171  auto nbytes_to_keep =
1172  (is_last_one ? data_buffer->size() : index_array[irow_to_vacuum]) -
1173  index_array[irow_of_blk_to_keep];
1174  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1175  // move curr varlen row block toward front
1176  memmove(data_addr + ibyte_var_data_to_keep,
1177  data_addr + index_array[irow_of_blk_to_keep],
1178  nbytes_to_keep);
1179 
1180  const auto index_base = index_array[irow_of_blk_to_keep];
1181  for (int64_t i = 0; i < nrows_to_keep; ++i) {
1182  auto& index = index_array[irow_of_blk_to_keep + i];
1183  index = ibyte_var_data_to_keep + (index - index_base);
1184  }
1185  }
1186  nbytes_var_data_to_keep += nbytes_to_keep;
1187  maddr_to_vacuum = indices_addr;
1188 
1189  constexpr static auto index_element_size = sizeof(StringOffsetT);
1190  nbytes_to_keep = nrows_to_keep * index_element_size;
1191  if (irow_of_blk_to_fill != irow_of_blk_to_keep) {
1192  // move curr fixlen row block toward front
1193  memmove(maddr_to_vacuum + irow_of_blk_to_fill * index_element_size,
1194  maddr_to_vacuum + irow_of_blk_to_keep * index_element_size,
1195  nbytes_to_keep);
1196  }
1197  irow_of_blk_to_fill += nrows_to_keep;
1198  nbytes_fix_data_to_keep += nbytes_to_keep;
1199  }
1200  irow_of_blk_to_keep = irow_to_vacuum + 1;
1201  }
1202  return nbytes_var_data_to_keep;
1203 }
1204 
1206  const TableDescriptor* td,
1207  const int fragment_id,
1208  const std::vector<uint64_t>& frag_offsets,
1209  const Data_Namespace::MemoryLevel memory_level,
1210  UpdelRoll& updel_roll) {
1211  auto& fragment = getFragmentInfoFromId(fragment_id);
1212  auto chunks = getChunksForAllColumns(td, fragment, memory_level);
1213  const auto ncol = chunks.size();
1214 
1215  std::vector<int8_t> has_null_per_thread(ncol, 0);
1216  std::vector<double> max_double_per_thread(ncol, std::numeric_limits<double>::lowest());
1217  std::vector<double> min_double_per_thread(ncol, std::numeric_limits<double>::max());
1218  std::vector<int64_t> max_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::min());
1219  std::vector<int64_t> min_int64t_per_thread(ncol, std::numeric_limits<uint64_t>::max());
1220 
1221  // parallel delete columns
1222  std::vector<std::future<void>> threads;
1223  auto nrows_to_vacuum = frag_offsets.size();
1224  auto nrows_in_fragment = fragment.getPhysicalNumTuples();
1225  auto nrows_to_keep = nrows_in_fragment - nrows_to_vacuum;
1226 
1227  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1228  auto chunk = chunks[ci];
1229  const auto cd = chunk->get_column_desc();
1230  const auto& col_type = cd->columnType;
1231  auto data_buffer = chunk->get_buffer();
1232  auto index_buffer = chunk->get_index_buf();
1233  auto data_addr = data_buffer->getMemoryPtr();
1234  auto indices_addr = index_buffer ? index_buffer->getMemoryPtr() : nullptr;
1235  auto index_array = (StringOffsetT*)indices_addr;
1236  bool is_varlen = col_type.is_varlen_indeed();
1237 
1238  auto fixlen_vacuum = [=,
1239  &has_null_per_thread,
1240  &max_double_per_thread,
1241  &min_double_per_thread,
1242  &min_int64t_per_thread,
1243  &max_int64t_per_thread,
1244  &updel_roll,
1245  &frag_offsets,
1246  &fragment] {
1247  size_t nbytes_fix_data_to_keep;
1248  nbytes_fix_data_to_keep = vacuum_fixlen_rows(fragment, chunk, frag_offsets);
1249 
1250  data_buffer->encoder->setNumElems(nrows_to_keep);
1251  data_buffer->setSize(nbytes_fix_data_to_keep);
1252  data_buffer->setUpdated();
1253 
1254  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1255 
1256  auto daddr = data_addr;
1257  auto element_size =
1258  col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type);
1259  for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
1260  if (col_type.is_fixlen_array()) {
1261  auto encoder =
1262  dynamic_cast<FixedLengthArrayNoneEncoder*>(data_buffer->encoder.get());
1263  CHECK(encoder);
1264  encoder->updateMetadata((int8_t*)daddr);
1265  } else if (col_type.is_fp()) {
1266  set_chunk_stats(col_type,
1267  data_addr,
1268  has_null_per_thread[ci],
1269  min_double_per_thread[ci],
1270  max_double_per_thread[ci]);
1271  } else {
1272  set_chunk_stats(col_type,
1273  data_addr,
1274  has_null_per_thread[ci],
1275  min_int64t_per_thread[ci],
1276  max_int64t_per_thread[ci]);
1277  }
1278  }
1279  };
1280 
1281  auto varlen_vacuum = [=, &updel_roll, &frag_offsets, &fragment] {
1282  size_t nbytes_var_data_to_keep;
1283  nbytes_var_data_to_keep = vacuum_varlen_rows(fragment, chunk, frag_offsets);
1284 
1285  data_buffer->encoder->setNumElems(nrows_to_keep);
1286  data_buffer->setSize(nbytes_var_data_to_keep);
1287  data_buffer->setUpdated();
1288 
1289  index_array[nrows_to_keep] = data_buffer->size();
1290  index_buffer->setSize(sizeof(*index_array) *
1291  (nrows_to_keep ? 1 + nrows_to_keep : 0));
1292  index_buffer->setUpdated();
1293 
1294  set_chunk_metadata(catalog, fragment, chunk, nrows_to_keep, updel_roll);
1295  };
1296 
1297  if (is_varlen) {
1298  threads.emplace_back(std::async(std::launch::async, varlen_vacuum));
1299  } else {
1300  threads.emplace_back(std::async(std::launch::async, fixlen_vacuum));
1301  }
1302  if (threads.size() >= (size_t)cpu_threads()) {
1303  wait_cleanup_threads(threads);
1304  }
1305  }
1306 
1307  wait_cleanup_threads(threads);
1308 
1309  auto key = std::make_pair(td, &fragment);
1310  updel_roll.numTuples[key] = nrows_to_keep;
1311  for (size_t ci = 0; ci < chunks.size(); ++ci) {
1312  auto chunk = chunks[ci];
1313  auto cd = chunk->get_column_desc();
1314  if (!cd->columnType.is_fixlen_array()) {
1316  fragment,
1317  chunk,
1318  has_null_per_thread[ci],
1319  max_double_per_thread[ci],
1320  min_double_per_thread[ci],
1321  max_int64t_per_thread[ci],
1322  min_int64t_per_thread[ci],
1323  cd->columnType,
1324  updel_roll);
1325  }
1326  }
1327 }
1328 
1329 } // namespace Fragmenter_Namespace
1330 
1332  if (nullptr == catalog) {
1333  return;
1334  }
1335  const auto td = catalog->getMetadataForTable(logicalTableId);
1336  CHECK(td);
1337  // checkpoint all shards regardless, or epoch becomes out of sync
1338  if (td->persistenceLevel == Data_Namespace::MemoryLevel::DISK_LEVEL) {
1339  catalog->checkpoint(logicalTableId);
1340  }
1341  // for each dirty fragment
1342  for (auto& cm : chunkMetadata) {
1343  cm.first.first->fragmenter->updateMetadata(catalog, cm.first, *this);
1344  }
1345  dirtyChunks.clear();
1346  // flush gpu dirty chunks if update was not on gpu
1347  if (memoryLevel != Data_Namespace::MemoryLevel::GPU_LEVEL) {
1348  for (const auto& chunkey : dirtyChunkeys) {
1349  catalog->getDataMgr().deleteChunksWithPrefix(
1351  }
1352  }
1353 }
1354 
1356  if (nullptr == catalog) {
1357  return;
1358  }
1359 
1360  if (is_varlen_update) {
1361  int databaseId = catalog->getCurrentDB().dbId;
1362  int32_t tableEpoch = catalog->getTableEpoch(databaseId, logicalTableId);
1363 
1364  dirtyChunks.clear();
1365  const_cast<Catalog_Namespace::Catalog*>(catalog)->setTableEpoch(
1366  databaseId, logicalTableId, tableEpoch);
1367  } else {
1368  const auto td = catalog->getMetadataForTable(logicalTableId);
1369  CHECK(td);
1370  if (td->persistenceLevel != memoryLevel) {
1371  for (auto dit : dirtyChunks) {
1372  catalog->getDataMgr().free(dit.first->get_buffer());
1373  dit.first->set_buffer(nullptr);
1374  }
1375  }
1376  }
1377 }
bool is_fp() const
Definition: sqltypes.h:481
bool is_boolean() const
Definition: sqltypes.h:484
Data_Namespace::MemoryLevel memoryLevel
Definition: UpdelRoll.h:65
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
bool is_varlen_update
Definition: UpdelRoll.h:67
std::vector< int > ChunkKey
Definition: types.h:35
const BUFFER_DATA_TYPE * data_buffer_addr_
StringChunkConverter(size_t num_rows, const Chunk_NS::Chunk *chunk)
std::unique_ptr< int64_t, CheckedMallocDeleter< int64_t >> ColumnDataPtr
AbstractBuffer * get_index_buf() const
Definition: Chunk.h:95
void commitUpdate()
const ColumnDescriptor * get_column_desc() const
Definition: Chunk.h:52
const int8_t const int64_t * num_rows
static void set_chunk_stats(const SQLTypeInfo &col_type, int8_t *data_addr, int8_t &has_null, T &min, T &max)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
ArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:335
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters(),::QueryRenderer::QueryRenderManager *render_manager=nullptr)
Definition: Execute.cpp:127
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:141
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:142
const ColumnDescriptor * column_descriptor_
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:40
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
void cancelUpdate()
std::map< Chunk_NS::Chunk *, std::shared_ptr< Chunk_NS::Chunk > > dirtyChunks
Definition: UpdelRoll.h:52
virtual int8_t * getMemoryPtr()=0
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::set< ChunkKey > dirtyChunkeys
Definition: UpdelRoll.h:53
#define UNREACHABLE()
Definition: Logger.h:234
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:108
AbstractBuffer * get_buffer() const
Definition: Chunk.h:94
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
std::string to_string(char const *&&v)
FixedLenArrayChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll) override
int32_t StringOffsetT
Definition: sqltypes.h:912
void wait_cleanup_threads(std::vector< std::future< void >> &threads)
const ColumnDescriptor * column_descriptor_
void insertDataNoCheckpoint(InsertData &insertDataStruct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
This file contains the class specification and related data structures for Catalog.
static int get_chunks(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
int64_t get_epoch_seconds_from_days(const int64_t days)
CHECK(cgen_state)
const Catalog_Namespace::Catalog * catalog
Definition: UpdelRoll.h:63
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
bool is_time() const
Definition: sqltypes.h:483
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
std::unique_ptr< std::vector< ArrayDatum > > column_data_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
virtual void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData)=0
virtual size_t const getEntryCount() const =0
DateChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:333
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems)
Definition: Chunk.cpp:28
int getLogicalTableId(const int physicalTableId) const
Definition: Catalog.cpp:2916
std::map< MetaDataKey, std::map< int, ChunkMetadata > > chunkMetadata
Definition: UpdelRoll.h:59
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1350
specifies the content in-memory of a row in the column metadata table
void validate(T value)
Definition: Encoder.h:98
const RETURN_TYPE * checked_get(size_t row, const SOURCE_TYPE *boost_variant, boost_variant_accessor< RETURN_TYPE > &accessor)
void put_null(void *ndptr, const SQLTypeInfo &ntype, const std::string col_name)
void tabulate_metadata(TYPE_INFO const &ti, T &min, T &max, int8_t &null_flag, T const val, SENTINEL_SUPPLIER s=SENTINEL_SUPPLIER())
static void set_chunk_metadata(const Catalog_Namespace::Catalog *catalog, FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const size_t nrows_to_keep, UpdelRoll &updel_roll)
bool is_null(const T &v, const SQLTypeInfo &t)
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
int32_t ArrayOffsetT
Definition: sqltypes.h:913
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
virtual void convertToColumnarFormat(size_t row, size_t indexInFragment)=0
bool is_integer() const
Definition: sqltypes.h:479
bool g_enable_experimental_string_functions
bool is_timeinterval() const
Definition: sqltypes.h:488
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:328
virtual StringDictionaryProxy * getLiteralDictionary() const =0
std::unique_ptr< std::vector< std::string > > column_data_
const std::map< int, ChunkMetadata > & getChunkMetadataMapPhysical() const
Definition: Fragmenter.h:102
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
int logicalTableId
Definition: UpdelRoll.h:64
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
unencoded fixed length array encoder
static void updateColumn(const Catalog_Namespace::Catalog *catalog, const std::string &tab_name, const std::string &col_name, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
bool is_string() const
Definition: sqltypes.h:477
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
const ColumnDescriptor * column_descriptor_
Descriptor for a dictionary for a string columne.
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
const BUFFER_DATA_TYPE * data_buffer_addr_
void addDataBlocksToInsertData(Fragmenter_Namespace::InsertData &insertData) override
SQLTypeInfo columnType
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
virtual std::vector< TargetValue > getEntryAt(const size_t index) const =0
specifies the content in-memory of a row in the table metadata table
int8_t * numbersPtr
Definition: sqltypes.h:140
bool is_decimal() const
Definition: sqltypes.h:480
void set_minmax(T &min, T &max, T const val)
int cpu_threads()
Definition: thread_count.h:25
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void convertToColumnarFormat(size_t row, size_t indexInFragment) override
std::mutex mutex
Definition: UpdelRoll.h:49
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const bool null, const double dmax, const double dmin, const int64_t lmax, const int64_t lmin, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
bool is_integral(const SQLTypeInfo &t)
std::unique_ptr< INSERT_DATA_TYPE, CheckedMallocDeleter< INSERT_DATA_TYPE >> ColumnDataPtr
std::unique_ptr< TargetValueConverter > create(ConverterCreateParameter param)
std::conditional_t< isCudaCC(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:122
std::map< MetaDataKey, size_t > numTuples
Definition: UpdelRoll.h:56
virtual size_t const getRowCount() const =0
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
ScalarChunkConverter(const size_t num_rows, const Chunk_NS::Chunk *chunk)