OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StorageIOFacility.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <future>
20 
22 #include "LockMgr/LockMgr.h"
24 #include "QueryEngine/Execute.h"
28 #include "Shared/UpdelRoll.h"
29 #include "Shared/likely.h"
30 #include "Shared/thread_count.h"
31 
33 
34 namespace {
40  int64_t min,
41  int64_t max) {
42  if (update_stats.old_values_stats.min_int64t <
43  update_stats.new_values_stats.min_int64t &&
44  update_stats.old_values_stats.min_int64t == min) {
45  return true;
46  }
47  if (update_stats.old_values_stats.max_int64t >
48  update_stats.new_values_stats.max_int64t &&
49  update_stats.old_values_stats.max_int64t == max) {
50  return true;
51  }
52  return false;
53 }
54 
56  double min,
57  double max) {
58  if (update_stats.old_values_stats.min_double <
59  update_stats.new_values_stats.min_double &&
60  update_stats.old_values_stats.min_double == min) {
61  return true;
62  }
63  if (update_stats.old_values_stats.max_double >
64  update_stats.new_values_stats.max_double &&
65  update_stats.old_values_stats.max_double == max) {
66  return true;
67  }
68  return false;
69 }
70 
72  const std::optional<Fragmenter_Namespace::ChunkUpdateStats>& update_stats) {
73  if (!g_enable_auto_metadata_update || !update_stats.has_value()) {
74  return false;
75  }
76 
77  CHECK(update_stats->chunk);
78  CHECK(update_stats->chunk->getBuffer());
79  CHECK(update_stats->chunk->getBuffer()->getEncoder());
80 
81  auto chunk_metadata = std::make_shared<ChunkMetadata>();
82  update_stats->chunk->getBuffer()->getEncoder()->getMetadata(chunk_metadata);
83  auto cd = update_stats.value().chunk->getColumnDesc();
84  if (cd->columnType.is_fp()) {
85  double min, max;
86  if (cd->columnType.get_type() == kDOUBLE) {
87  min = chunk_metadata->chunkStats.min.doubleval;
88  max = chunk_metadata->chunkStats.max.doubleval;
89  } else if (cd->columnType.get_type() == kFLOAT) {
90  min = chunk_metadata->chunkStats.min.floatval;
91  max = chunk_metadata->chunkStats.max.floatval;
92  } else {
93  min = 0; // resolve compiler warning about uninitialized variables
94  max = -1;
95  UNREACHABLE();
96  }
97  return is_chunk_min_max_updated(update_stats.value(), min, max);
98  } else {
99  auto min = extract_min_stat_int_type(chunk_metadata->chunkStats, cd->columnType);
100  auto max = extract_max_stat_int_type(chunk_metadata->chunkStats, cd->columnType);
101  return is_chunk_min_max_updated(update_stats.value(), min, max);
102  }
103 }
104 } // namespace
105 
107  public:
109 
111  using DeleteVictimOffsetList = std::vector<uint64_t>;
112  using UpdateTargetOffsetList = std::vector<uint64_t>;
113  using UpdateTargetTypeList = std::vector<TargetMetaInfo>;
114  using UpdateTargetColumnNamesList = std::vector<std::string>;
115 
116  using TransactionLog =
118  using TransactionLogPtr = std::unique_ptr<TransactionLog>;
119  using ColumnValidationFunction = std::function<bool(std::string const&)>;
120 
122  public:
124  const Catalog_Namespace::Catalog& catalog)
125  : table_descriptor_(table_descriptor)
126  , table_is_temporary_(table_is_temporary(table_descriptor))
127  , catalog_(catalog) {}
128 
129  virtual ~TransactionParameters() = default;
130 
132  return transaction_tracker_;
133  }
135  auto update_occurred = transaction_tracker_.commitUpdate();
136  if (!update_occurred && table_descriptor_->persistenceLevel ==
138  // If commitUpdate() did not checkpoint, then we need to checkpoint here in order
139  // to ensure that epochs are uniformly incremented in distributed mode.
141  }
142  }
143 
144  auto tableIsTemporary() const { return table_is_temporary_; }
145 
146  auto const* getTableDescriptor() const { return table_descriptor_; }
147 
148  const Catalog_Namespace::Catalog& getCatalog() const { return catalog_; }
149 
151 
152  void setInputSourceNode(const RelAlgNode* input_source_node) {
153  input_source_node_ = input_source_node;
154  }
155 
156  private:
162  };
163 
165  public:
167  const Catalog_Namespace::Catalog& catalog)
168  : TransactionParameters(table_descriptor, catalog) {}
169 
170  private:
173  delete;
174  };
175 
177  public:
179  const Catalog_Namespace::Catalog& catalog,
180  UpdateTargetColumnNamesList const& update_column_names,
181  UpdateTargetTypeList const& target_types,
182  bool varlen_update_required)
183  : TransactionParameters(table_descriptor, catalog)
184  , update_column_names_(update_column_names)
185  , targets_meta_(target_types)
186  , varlen_update_required_(varlen_update_required) {}
187 
188  auto getUpdateColumnCount() const { return update_column_names_.size(); }
189  auto const& getTargetsMetaInfo() const { return targets_meta_; }
190  auto getTargetsMetaInfoSize() const { return targets_meta_.size(); }
191  auto const& getUpdateColumnNames() const { return update_column_names_; }
193 
194  private:
197  delete;
198 
202  };
203 
204  StorageIOFacility(Executor* executor) : executor_(executor) {}
205 
207  UpdateTransactionParameters& update_parameters) {
208  using OffsetVector = std::vector<uint64_t>;
209  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
210  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
211 
212  if (update_parameters.isVarlenUpdateRequired()) {
213  auto callback = [this, &update_parameters](
214  UpdateLogForFragment const& update_log,
215  TableUpdateMetadata& table_update_metadata) -> void {
216  std::vector<const ColumnDescriptor*> columnDescriptors;
217  std::vector<TargetMetaInfo> sourceMetaInfos;
218 
219  const auto& catalog = update_parameters.getCatalog();
220  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size();
221  idx++) {
222  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
223  auto target_column =
224  catalog.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
225  columnDescriptors.push_back(target_column);
226  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
227  }
228 
229  auto td = catalog.getMetadataForTable(update_log.getPhysicalTableId());
230  auto* fragmenter = td->fragmenter.get();
231  CHECK(fragmenter);
232 
233  fragmenter->updateColumns(
234  &catalog,
235  td,
236  update_log.getFragmentId(),
237  sourceMetaInfos,
238  columnDescriptors,
239  update_log,
240  update_parameters.getUpdateColumnCount(), // last column of result set
242  update_parameters.getTransactionTracker(),
243  executor_);
244  table_update_metadata.fragments_with_deleted_rows[td->tableId].emplace(
245  update_log.getFragmentId());
246  };
247  return callback;
248  } else if (update_parameters.tableIsTemporary()) {
249  auto callback = [&update_parameters](UpdateLogForFragment const& update_log,
250  TableUpdateMetadata&) -> void {
251  auto rs = update_log.getResultSet();
252  CHECK(rs->didOutputColumnar());
253  CHECK(rs->isDirectColumnarConversionPossible());
254  CHECK_EQ(update_parameters.getUpdateColumnCount(), size_t(1));
255  CHECK_EQ(rs->colCount(), size_t(1));
256 
257  // Temporary table updates require the full projected column
258  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
259 
260  const auto& catalog = update_parameters.getCatalog();
261  ChunkKey chunk_key_prefix{catalog.getCurrentDB().dbId,
262  update_parameters.getTableDescriptor()->tableId};
263  const auto table_lock =
265 
266  auto& fragment_info = update_log.getFragmentInfo();
267  const auto td = catalog.getMetadataForTable(update_log.getPhysicalTableId());
268  CHECK(td);
269  const auto cd = catalog.getMetadataForColumn(
270  td->tableId, update_parameters.getUpdateColumnNames().front());
271  CHECK(cd);
272  auto chunk_metadata =
273  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
274  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
275  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
276  td->tableId,
277  cd->columnId,
278  fragment_info.fragmentId};
279  auto chunk = Chunk_NS::Chunk::getChunk(cd,
280  &catalog.getDataMgr(),
281  chunk_key,
283  0,
284  chunk_metadata->second->numBytes,
285  chunk_metadata->second->numElements);
286  CHECK(chunk);
287  auto chunk_buffer = chunk->getBuffer();
288  CHECK(chunk_buffer);
289 
290  auto encoder = chunk_buffer->getEncoder();
291  CHECK(encoder);
292 
293  auto owned_buffer = StorageIOFacility::getRsBufferNoPadding(
294  rs.get(), 0, cd->columnType, rs->rowCount());
295  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
296 
297  const auto new_chunk_metadata =
298  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
299  CHECK(new_chunk_metadata);
300 
301  auto fragmenter = td->fragmenter.get();
302  CHECK(fragmenter);
303 
304  // The fragmenter copy of the fragment info differs from the copy used by the
305  // query engine. Update metadata in the fragmenter directly.
306  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
307  // TODO: we may want to put this directly in the fragmenter so we are under the
308  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
309  // allowed in this path.
310 
311  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
312  fragment->shadowChunkMetadataMap =
313  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
314 
315  auto& data_mgr = catalog.getDataMgr();
316  if (data_mgr.gpusPresent()) {
317  // flush any GPU copies of the updated chunk
318  data_mgr.deleteChunksWithPrefix(chunk_key,
320  }
321  };
322  return callback;
323  } else {
324  auto callback = [this, &update_parameters](
325  UpdateLogForFragment const& update_log,
326  TableUpdateMetadata& table_update_metadata) -> void {
327  auto entries_per_column = update_log.getEntryCount();
328  auto rows_per_column = update_log.getRowCount();
329  if (rows_per_column == 0) {
330  return;
331  }
332 
333  OffsetVector column_offsets(rows_per_column);
334  ScalarTargetValueVector scalar_target_values(rows_per_column);
335 
336  auto complete_entry_block_size = entries_per_column / normalized_cpu_threads();
337  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
338  auto usable_threads = normalized_cpu_threads();
339  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
340  complete_entry_block_size = entries_per_column;
341  partial_row_block_size = 0;
342  usable_threads = 1;
343  }
344 
345  std::atomic<size_t> row_idx{0};
346 
347  auto process_rows =
348  [&update_parameters, &column_offsets, &scalar_target_values, &row_idx](
349  auto get_entry_at_func,
350  uint64_t column_index,
351  uint64_t entry_start,
352  uint64_t entry_count) -> uint64_t {
353  uint64_t entries_processed = 0;
354  for (uint64_t entry_index = entry_start;
355  entry_index < (entry_start + entry_count);
356  entry_index++) {
357  const auto& row = get_entry_at_func(entry_index);
358  if (row.empty()) {
359  continue;
360  }
361 
362  entries_processed++;
363  size_t row_index = row_idx.fetch_add(1);
364 
365  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
366 
367  auto terminal_column_iter = std::prev(row.end());
368  const auto frag_offset_scalar_tv =
369  boost::get<ScalarTargetValue>(&*terminal_column_iter);
370  CHECK(frag_offset_scalar_tv);
371 
372  column_offsets[row_index] =
373  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
374  scalar_target_values[row_index] =
375  boost::get<ScalarTargetValue>(row[column_index]);
376  }
377  return entries_processed;
378  };
379 
380  auto get_row_index =
381  [complete_entry_block_size](uint64_t thread_index) -> uint64_t {
382  return (thread_index * complete_entry_block_size);
383  };
384 
385  const auto& catalog = update_parameters.getCatalog();
386  auto const* table_descriptor =
387  catalog.getMetadataForTable(update_log.getPhysicalTableId());
388  auto fragment_id = update_log.getFragmentId();
389  auto table_id = update_log.getPhysicalTableId();
390  if (!table_descriptor) {
391  const auto* input_source_node = update_parameters.getInputSourceNode();
392  if (auto proj_node = dynamic_cast<const RelProject*>(input_source_node)) {
393  if (proj_node->hasPushedDownWindowExpr() ||
394  proj_node->hasWindowFunctionExpr()) {
395  table_id = proj_node->getModifiedTableDescriptor()->tableId;
396  table_descriptor = catalog.getMetadataForTable(table_id);
397  }
398  }
399  }
400  CHECK(table_descriptor);
401 
402  // Iterate over each column
403  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
404  column_index < update_parameters.getUpdateColumnCount();
405  column_index++) {
406  row_idx = 0;
407  RowProcessingFuturesVector entry_processing_futures;
408  entry_processing_futures.reserve(usable_threads);
409 
410  auto get_entry_at_func = [&update_log,
411  &column_index](const size_t entry_index) {
412  if (UNLIKELY(update_log.getColumnType(column_index).is_string())) {
413  return update_log.getTranslatedEntryAt(entry_index);
414  } else {
415  return update_log.getEntryAt(entry_index);
416  }
417  };
418 
419  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
420  entry_processing_futures.emplace_back(
422  std::forward<decltype(process_rows)>(process_rows),
423  get_entry_at_func,
424  column_index,
425  get_row_index(i),
426  complete_entry_block_size));
427  }
428  if (partial_row_block_size) {
429  entry_processing_futures.emplace_back(
431  std::forward<decltype(process_rows)>(process_rows),
432  get_entry_at_func,
433  column_index,
434  get_row_index(usable_threads),
435  partial_row_block_size));
436  }
437 
438  uint64_t entries_processed(0);
439  for (auto& t : entry_processing_futures) {
440  t.wait();
441  entries_processed += t.get();
442  }
443 
444  CHECK(row_idx == rows_per_column);
445  const auto fragmenter = table_descriptor->fragmenter;
446  CHECK(fragmenter);
447  auto const* target_column = catalog.getMetadataForColumn(
448  table_id, update_parameters.getUpdateColumnNames()[column_index]);
449  CHECK(target_column);
450  auto update_stats =
451  fragmenter->updateColumn(&catalog,
452  table_descriptor,
453  target_column,
454  fragment_id,
455  column_offsets,
456  scalar_target_values,
457  update_log.getColumnType(column_index),
459  update_parameters.getTransactionTracker());
461  table_update_metadata.columns_for_metadata_update[target_column].emplace(
462  fragment_id);
463  }
464  }
465  };
466  return callback;
467  }
468  }
469 
471  DeleteTransactionParameters& delete_parameters) {
472  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
473 
474  if (delete_parameters.tableIsTemporary()) {
475  auto logical_table_id = delete_parameters.getTableDescriptor()->tableId;
476  const auto& catalog = delete_parameters.getCatalog();
477  auto callback = [logical_table_id, &catalog](UpdateLogForFragment const& update_log,
478  TableUpdateMetadata&) -> void {
479  auto rs = update_log.getResultSet();
480  CHECK(rs->didOutputColumnar());
481  CHECK(rs->isDirectColumnarConversionPossible());
482  CHECK_EQ(rs->colCount(), size_t(1));
483 
484  // Temporary table updates require the full projected column
485  CHECK_EQ(rs->rowCount(), update_log.getRowCount());
486 
487  const ChunkKey lock_chunk_key{catalog.getCurrentDB().dbId, logical_table_id};
488  const auto table_lock =
490 
491  auto& fragment_info = update_log.getFragmentInfo();
492  const auto td = catalog.getMetadataForTable(update_log.getPhysicalTableId());
493  CHECK(td);
494  const auto cd = catalog.getDeletedColumn(td);
495  CHECK(cd);
496  CHECK(cd->columnType.get_type() == kBOOLEAN);
497  auto chunk_metadata =
498  fragment_info.getChunkMetadataMapPhysical().find(cd->columnId);
499  CHECK(chunk_metadata != fragment_info.getChunkMetadataMapPhysical().end());
500  ChunkKey chunk_key{catalog.getCurrentDB().dbId,
501  td->tableId,
502  cd->columnId,
503  fragment_info.fragmentId};
504  auto chunk = Chunk_NS::Chunk::getChunk(cd,
505  &catalog.getDataMgr(),
506  chunk_key,
508  0,
509  chunk_metadata->second->numBytes,
510  chunk_metadata->second->numElements);
511  CHECK(chunk);
512  auto chunk_buffer = chunk->getBuffer();
513  CHECK(chunk_buffer);
514 
515  auto encoder = chunk_buffer->getEncoder();
516  CHECK(encoder);
517 
518  auto owned_buffer = StorageIOFacility::getRsBufferNoPadding(
519  rs.get(), 0, cd->columnType, rs->rowCount());
520  auto buffer = reinterpret_cast<int8_t*>(owned_buffer.get());
521 
522  const auto new_chunk_metadata =
523  encoder->appendData(buffer, rs->rowCount(), cd->columnType, false, 0);
524 
525  auto fragmenter = td->fragmenter.get();
526  CHECK(fragmenter);
527 
528  // The fragmenter copy of the fragment info differs from the copy used by the
529  // query engine. Update metadata in the fragmenter directly.
530  auto fragment = fragmenter->getFragmentInfo(fragment_info.fragmentId);
531  // TODO: we may want to put this directly in the fragmenter so we are under the
532  // fragmenter lock. But, concurrent queries on the same fragmenter should not be
533  // allowed in this path.
534 
535  fragment->setChunkMetadata(cd->columnId, new_chunk_metadata);
536  fragment->shadowChunkMetadataMap =
537  fragment->getChunkMetadataMapPhysicalCopy(); // TODO(adb): needed?
538 
539  auto& data_mgr = catalog.getDataMgr();
540  if (data_mgr.gpusPresent()) {
541  // flush any GPU copies of the updated chunk
542  data_mgr.deleteChunksWithPrefix(chunk_key,
544  }
545  };
546  return callback;
547  } else {
548  auto callback = [this, &delete_parameters](
549  UpdateLogForFragment const& update_log,
550  TableUpdateMetadata& table_update_metadata) -> void {
551  auto entries_per_column = update_log.getEntryCount();
552  auto rows_per_column = update_log.getRowCount();
553  if (rows_per_column == 0) {
554  return;
555  }
556  DeleteVictimOffsetList victim_offsets(rows_per_column);
557 
558  auto complete_row_block_size = entries_per_column / normalized_cpu_threads();
559  auto partial_row_block_size = entries_per_column % normalized_cpu_threads();
560  auto usable_threads = normalized_cpu_threads();
561 
562  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
563  complete_row_block_size = rows_per_column;
564  partial_row_block_size = 0;
565  usable_threads = 1;
566  }
567 
568  std::atomic<size_t> row_idx{0};
569 
570  auto process_rows = [&update_log, &victim_offsets, &row_idx](
571  uint64_t entry_start, uint64_t entry_count) -> uint64_t {
572  uint64_t entries_processed = 0;
573 
574  for (uint64_t entry_index = entry_start;
575  entry_index < (entry_start + entry_count);
576  entry_index++) {
577  auto const row(update_log.getEntryAt(entry_index));
578 
579  if (row.empty()) {
580  continue;
581  }
582 
583  entries_processed++;
584  size_t row_index = row_idx.fetch_add(1);
585 
586  auto terminal_column_iter = std::prev(row.end());
587  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
588  CHECK(scalar_tv);
589 
590  uint64_t fragment_offset =
591  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
592  victim_offsets[row_index] = fragment_offset;
593  }
594  return entries_processed;
595  };
596 
597  auto get_row_index =
598  [complete_row_block_size](uint64_t thread_index) -> uint64_t {
599  return thread_index * complete_row_block_size;
600  };
601 
602  RowProcessingFuturesVector row_processing_futures;
603  row_processing_futures.reserve(usable_threads);
604 
605  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
606  row_processing_futures.emplace_back(
608  std::forward<decltype(process_rows)>(process_rows),
609  get_row_index(i),
610  complete_row_block_size));
611  }
612  if (partial_row_block_size) {
613  row_processing_futures.emplace_back(
615  std::forward<decltype(process_rows)>(process_rows),
616  get_row_index(usable_threads),
617  partial_row_block_size));
618  }
619 
620  uint64_t rows_processed(0);
621  for (auto& t : row_processing_futures) {
622  t.wait();
623  rows_processed += t.get();
624  }
625 
626  const auto& catalog = delete_parameters.getCatalog();
627  auto const* table_descriptor =
628  catalog.getMetadataForTable(update_log.getPhysicalTableId());
629  CHECK(table_descriptor);
630  CHECK(!table_is_temporary(table_descriptor));
631  auto* fragmenter = table_descriptor->fragmenter.get();
632  CHECK(fragmenter);
633 
634  auto const* deleted_column_desc = catalog.getDeletedColumn(table_descriptor);
635  CHECK(deleted_column_desc);
636  fragmenter->updateColumn(&catalog,
637  table_descriptor,
638  deleted_column_desc,
639  update_log.getFragmentId(),
640  victim_offsets,
641  ScalarTargetValue(int64_t(1L)),
642  update_log.getColumnType(0),
644  delete_parameters.getTransactionTracker());
645  table_update_metadata.fragments_with_deleted_rows[table_descriptor->tableId]
646  .emplace(update_log.getFragmentId());
647  };
648  return callback;
649  }
650  }
651 
652  private:
653  int normalized_cpu_threads() const { return cpu_threads() / 2; }
654 
655  static std::unique_ptr<int8_t[]> getRsBufferNoPadding(const ResultSet* rs,
656  size_t col_idx,
657  const SQLTypeInfo& column_type,
658  size_t row_count) {
659  const auto padded_size = rs->getPaddedSlotWidthBytes(col_idx);
660  const auto type_size = column_type.is_dict_encoded_string()
661  ? column_type.get_size()
662  : column_type.get_logical_size();
663 
664  auto rs_buffer_size = padded_size * row_count;
665  auto rs_buffer = std::make_unique<int8_t[]>(rs_buffer_size);
666  rs->copyColumnIntoBuffer(col_idx, rs_buffer.get(), rs_buffer_size);
667 
668  if (type_size < padded_size) {
669  // else we're going to remove padding and we do it inplace in the same buffer
670  // we can do updates inplace in the same buffer because type_size < padded_size
671  // for some types, like kFLOAT, simple memcpy is not enough
672  auto src_ptr = rs_buffer.get();
673  auto dst_ptr = rs_buffer.get();
674  if (column_type.is_fp()) {
675  CHECK(column_type.get_type() == kFLOAT);
676  CHECK(padded_size == sizeof(double));
677  for (size_t i = 0; i < row_count; i++) {
678  const auto old_val = *reinterpret_cast<double*>(may_alias_ptr(src_ptr));
679  auto new_val = static_cast<float>(old_val);
680  std::memcpy(dst_ptr, &new_val, type_size);
681  dst_ptr += type_size;
682  src_ptr += padded_size;
683  }
684  } else {
685  // otherwise just take first type_size bytes from the padded value
686  for (size_t i = 0; i < row_count; i++) {
687  std::memcpy(dst_ptr, src_ptr, type_size);
688  dst_ptr += type_size;
689  src_ptr += padded_size;
690  }
691  }
692  }
693  return rs_buffer;
694  }
695 
696  Executor* executor_;
697 };
SQLTypeInfo getColumnType(const size_t col_idx) const
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
StorageIOFacility::TransactionLog transaction_tracker_
HOST DEVICE int get_size() const
Definition: sqltypes.h:393
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
void update_stats(Encoder *encoder, const SQLTypeInfo &column_type, DataBlockPtr data_block, const size_t row_count)
const Catalog_Namespace::Catalog & getCatalog() const
std::vector< TargetMetaInfo > UpdateTargetTypeList
const Catalog_Namespace::Catalog & catalog_
StorageIOFacility::UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
bool should_recompute_metadata(const std::optional< Fragmenter_Namespace::ChunkUpdateStats > &update_stats)
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
bool is_fp() const
Definition: sqltypes.h:584
#define UNREACHABLE()
Definition: Logger.h:337
bool is_chunk_min_max_updated(const Fragmenter_Namespace::ChunkUpdateStats &update_stats, int64_t min, int64_t max)
static std::unique_ptr< int8_t[]> getRsBufferNoPadding(const ResultSet *rs, size_t col_idx, const SQLTypeInfo &column_type, size_t row_count)
bool g_enable_auto_metadata_update
int normalized_cpu_threads() const
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:381
UpdateTransactionParameters & operator=(UpdateTransactionParameters const &other)=delete
int64_t extract_max_stat_int_type(const ChunkStats &stats, const SQLTypeInfo &ti)
std::function< void(const UpdateLogForFragment &, TableUpdateMetadata &)> Callback
Definition: Execute.h:349
std::vector< std::string > UpdateTargetColumnNamesList
std::vector< uint64_t > UpdateTargetOffsetList
StorageIOFacility::UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
future< Result > async(Fn &&fn, Args &&...args)
bool commitUpdate()
std::vector< TargetValue > getEntryAt(const size_t index) const override
int get_logical_size() const
Definition: sqltypes.h:403
decltype(FragmentInfoType::fragmentId) const getFragmentId() const
Definition: Execute.h:343
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:248
decltype(FragmentInfoType::physicalTableId) const getPhysicalTableId() const
Definition: Execute.h:340
int64_t extract_min_stat_int_type(const ChunkStats &stats, const SQLTypeInfo &ti)
FragmentInfoType const & getFragmentInfo() const
DeleteTransactionParameters & operator=(DeleteTransactionParameters const &other)=delete
DeleteTransactionParameters(const TableDescriptorType *table_descriptor, const Catalog_Namespace::Catalog &catalog)
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
void checkpointWithAutoRollback(const int logical_table_id) const
Definition: Catalog.cpp:4861
#define UNLIKELY(x)
Definition: likely.h:25
std::unique_ptr< TransactionLog > TransactionLogPtr
std::function< bool(std::string const &)> ColumnValidationFunction
bool table_is_temporary(const TableDescriptor *const td)
Data_Namespace::MemoryLevel persistenceLevel
void setInputSourceNode(const RelAlgNode *input_source_node)
TableDescriptorType const * table_descriptor_
UpdateLogForFragment::Callback UpdateCallback
#define CHECK(condition)
Definition: Logger.h:291
StorageIOFacility::TransactionLog & getTransactionTracker()
std::vector< uint64_t > DeleteVictimOffsetList
bool is_dict_encoded_string() const
Definition: sqltypes.h:632
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
bool is_string() const
Definition: sqltypes.h:580
const RelAlgNode * getInputSourceNode() const
TransactionParameters(const TableDescriptorType *table_descriptor, const Catalog_Namespace::Catalog &catalog)
auto getResultSet() const
Definition: Execute.h:351
void finalizeTransaction(const Catalog_Namespace::Catalog &catalog)
size_t const getRowCount() const override
int cpu_threads()
Definition: thread_count.h:25
StorageIOFacility(Executor *executor)
UpdateTransactionParameters(TableDescriptorType const *table_descriptor, const Catalog_Namespace::Catalog &catalog, UpdateTargetColumnNamesList const &update_column_names, UpdateTargetTypeList const &target_types, bool varlen_update_required)
size_t const getEntryCount() const override
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180