OmniSciDB  04ee39c94c
StorageIOFacility.h
Go to the documentation of this file.
1 #ifndef STORAGEIOFACILITY_H
2 #define STORAGEIOFACILITY_H
3 
6 #include "TargetMetaInfo.h"
7 
8 #include <boost/variant.hpp>
9 #include "Shared/ConfigResolve.h"
11 #include "Shared/UpdelRoll.h"
12 #include "Shared/likely.h"
13 #include "Shared/thread_count.h"
14 
15 #include <future>
16 
17 template <typename FRAGMENTER_TYPE = Fragmenter_Namespace::InsertOrderFragmenter>
19  public:
20  using FragmenterType = FRAGMENTER_TYPE;
21  using DeleteVictimOffsetList = std::vector<uint64_t>;
22  using UpdateTargetOffsetList = std::vector<uint64_t>;
23  using UpdateTargetTypeList = std::vector<TargetMetaInfo>;
24  using UpdateTargetColumnNamesList = std::vector<std::string>;
25  using TransactionLog = typename FragmenterType::ModifyTransactionTracker;
26  using TransactionLogPtr = std::unique_ptr<TransactionLog>;
27  using ColumnValidationFunction = std::function<bool(std::string const&)>;
28 
29  template <typename CATALOG_TYPE,
30  typename TABLE_ID_TYPE,
31  typename COLUMN_NAME_TYPE,
32  typename FRAGMENT_ID_TYPE,
33  typename FRAGMENT_OFFSET_LIST_TYPE,
34  typename UPDATE_VALUES_LIST_TYPE,
35  typename COLUMN_TYPE_INFO>
36  static void updateColumn(CATALOG_TYPE const& cat,
37  TABLE_ID_TYPE const&& table_id,
38  COLUMN_NAME_TYPE const& column_name,
39  FRAGMENT_ID_TYPE const frag_id,
40  FRAGMENT_OFFSET_LIST_TYPE const& frag_offsets,
41  UPDATE_VALUES_LIST_TYPE const& update_values,
42  COLUMN_TYPE_INFO const& col_type_info,
43  TransactionLog& transaction_tracker) {
44  auto const* table_descriptor = cat.getMetadataForTable(table_id);
45  auto* fragmenter = table_descriptor->fragmenter;
46  CHECK(fragmenter);
47  auto const* target_column = cat.getMetadataForColumn(table_id, column_name);
48 
49  fragmenter->updateColumn(&cat,
50  table_descriptor,
51  target_column,
52  frag_id,
53  frag_offsets,
54  update_values,
55  col_type_info,
57  transaction_tracker);
58  }
59 
60  template <typename CATALOG_TYPE,
61  typename TABLE_ID_TYPE,
62  typename FRAGMENT_ID_TYPE,
63  typename VICTIM_OFFSET_LIST,
64  typename COLUMN_TYPE_INFO>
65  static void deleteColumns(CATALOG_TYPE const& cat,
66  TABLE_ID_TYPE const&& table_id,
67  FRAGMENT_ID_TYPE const frag_id,
68  VICTIM_OFFSET_LIST& victims,
69  COLUMN_TYPE_INFO const& col_type_info,
70  TransactionLog& transaction_tracker) {
71  auto const* table_descriptor = cat.getMetadataForTable(table_id);
72  auto* fragmenter = table_descriptor->fragmenter;
73  CHECK(fragmenter);
74 
75  auto const* deleted_column_desc = cat.getDeletedColumn(table_descriptor);
76  if (deleted_column_desc != nullptr) {
77  fragmenter->updateColumn(&cat,
78  table_descriptor,
79  deleted_column_desc,
80  frag_id,
81  victims,
82  ScalarTargetValue(int64_t(1L)),
83  col_type_info,
85  transaction_tracker);
86  } else {
87  LOG(INFO) << "Delete metadata column unavailable; skipping delete operation.";
88  }
89  }
90 
91  template <typename CATALOG_TYPE, typename TABLE_DESCRIPTOR_TYPE>
92  static std::function<bool(std::string const&)> yieldColumnValidator(
93  CATALOG_TYPE const& cat,
94  TABLE_DESCRIPTOR_TYPE const* table_descriptor) {
95  return [](std::string const& column_name) -> bool { return true; };
96  };
97 };
98 
99 template <typename EXECUTOR_TRAITS,
100  typename IO_FACET = DefaultIOFacet<>,
101  typename FRAGMENT_UPDATER = UpdateLogForFragment>
103  public:
104  using ExecutorType = typename EXECUTOR_TRAITS::ExecutorType;
105  using CatalogType = typename EXECUTOR_TRAITS::CatalogType;
106  using FragmentUpdaterType = FRAGMENT_UPDATER;
108  using IOFacility = IO_FACET;
109  using TableDescriptorType = typename EXECUTOR_TRAITS::TableDescriptorType;
114  using UpdateTargetColumnNameType = typename UpdateTargetColumnNamesList::value_type;
116 
119 
120  struct MethodSelector {
121  static constexpr auto getEntryAt(StringSelector) {
122  return &FragmentUpdaterType::getTranslatedEntryAt;
123  }
124  static constexpr auto getEntryAt(NonStringSelector) {
125  return &FragmentUpdaterType::getEntryAt;
126  }
127  };
128 
130  public:
131  typename IOFacility::TransactionLog& getTransactionTracker() {
132  return transaction_tracker_;
133  }
134  void finalizeTransaction() { transaction_tracker_.commitUpdate(); }
135 
136  private:
137  typename IOFacility::TransactionLog transaction_tracker_;
138  };
139 
141  public:
143 
144  private:
147  delete;
148  };
149 
151  public:
153  UpdateTargetColumnNamesList const& update_column_names,
154  UpdateTargetTypeList const& target_types,
155  bool varlen_update_required)
156  : table_descriptor_(table_desc)
157  , update_column_names_(update_column_names)
158  , targets_meta_(target_types)
159  , varlen_update_required_(varlen_update_required){};
160 
161  auto getUpdateColumnCount() const { return update_column_names_.size(); }
162  auto const* getTableDescriptor() const { return table_descriptor_; }
163  auto const& getTargetsMetaInfo() const { return targets_meta_; }
164  auto getTargetsMetaInfoSize() const { return targets_meta_.size(); }
165  auto const& getUpdateColumnNames() const { return update_column_names_; }
166  auto isVarlenUpdateRequired() const { return varlen_update_required_; }
167 
168  private:
171  delete;
172 
176  bool varlen_update_required_ = false;
177  };
178 
179  StorageIOFacility(ExecutorType* executor, CatalogType const& catalog)
180  : executor_(executor), catalog_(catalog) {}
181 
183  TableDescriptorType const* table_descriptor) {
184  return IOFacility::yieldColumnValidator(catalog_, table_descriptor);
185  }
186 
187  UpdateCallback yieldUpdateCallback(UpdateTransactionParameters& update_parameters);
188  UpdateCallback yieldDeleteCallback(DeleteTransactionParameters& delete_parameters);
189 
190  private:
191  int normalized_cpu_threads() const { return cpu_threads() / 2; }
192 
195 };
196 
197 template <typename EXECUTOR_TRAITS, typename IO_FACET, typename FRAGMENT_UPDATER>
200  UpdateTransactionParameters& update_parameters) {
201  using OffsetVector = std::vector<uint64_t>;
202  using ScalarTargetValueVector = std::vector<ScalarTargetValue>;
203  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
204 
205  if (update_parameters.isVarlenUpdateRequired()) {
206  auto callback = [this,
207  &update_parameters](FragmentUpdaterType const& update_log) -> void {
208  std::vector<const ColumnDescriptor*> columnDescriptors;
209  std::vector<TargetMetaInfo> sourceMetaInfos;
210 
211  for (size_t idx = 0; idx < update_parameters.getUpdateColumnNames().size(); idx++) {
212  auto& column_name = update_parameters.getUpdateColumnNames()[idx];
213  auto target_column =
214  catalog_.getMetadataForColumn(update_log.getPhysicalTableId(), column_name);
215  columnDescriptors.push_back(target_column);
216  sourceMetaInfos.push_back(update_parameters.getTargetsMetaInfo()[idx]);
217  }
218 
219  auto td = catalog_.getMetadataForTable(update_log.getPhysicalTableId());
220  auto* fragmenter = td->fragmenter;
221  CHECK(fragmenter);
222 
223  fragmenter->updateColumns(
224  &catalog_,
225  td,
226  update_log.getFragmentId(),
227  sourceMetaInfos,
228  columnDescriptors,
229  update_log,
230  update_parameters.getUpdateColumnCount(), // last column of result set
232  update_parameters.getTransactionTracker());
233  };
234  return callback;
235 
236  } else {
237  auto callback = [this,
238  &update_parameters](FragmentUpdaterType const& update_log) -> void {
239  auto rows_per_column = update_log.getEntryCount();
240  if (rows_per_column == 0) {
241  return;
242  }
243 
244  OffsetVector column_offsets(rows_per_column);
245  ScalarTargetValueVector scalar_target_values(rows_per_column);
246 
247  auto complete_row_block_size = rows_per_column / normalized_cpu_threads();
248  auto partial_row_block_size = rows_per_column % normalized_cpu_threads();
249  auto usable_threads = normalized_cpu_threads();
250  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
251  complete_row_block_size = rows_per_column;
252  partial_row_block_size = 0;
253  usable_threads = 1;
254  }
255 
256  auto process_rows =
257  [&update_log, &update_parameters, &column_offsets, &scalar_target_values](
258  auto type_tag,
259  uint64_t column_index,
260  uint64_t row_start,
261  uint64_t row_count) -> uint64_t {
262  uint64_t rows_processed = 0;
263  for (uint64_t row_index = row_start; row_index < (row_start + row_count);
264  row_index++, rows_processed++) {
265  constexpr auto get_entry_method_sel(MethodSelector::getEntryAt(type_tag));
266  auto const row((update_log.*get_entry_method_sel)(row_index));
267 
268  CHECK(!row.empty());
269  CHECK(row.size() == update_parameters.getUpdateColumnCount() + 1);
270 
271  auto terminal_column_iter = std::prev(row.end());
272  const auto frag_offset_scalar_tv =
273  boost::get<ScalarTargetValue>(&*terminal_column_iter);
274  CHECK(frag_offset_scalar_tv);
275 
276  column_offsets[row_index] =
277  static_cast<uint64_t>(*(boost::get<int64_t>(frag_offset_scalar_tv)));
278  scalar_target_values[row_index] =
279  boost::get<ScalarTargetValue>(row[column_index]);
280  }
281  return rows_processed;
282  };
283 
284  auto get_row_index = [complete_row_block_size](uint64_t thread_index) -> uint64_t {
285  return (thread_index * complete_row_block_size);
286  };
287 
288  // Iterate over each column
289  for (decltype(update_parameters.getUpdateColumnCount()) column_index = 0;
290  column_index < update_parameters.getUpdateColumnCount();
291  column_index++) {
292  RowProcessingFuturesVector row_processing_futures;
293  row_processing_futures.reserve(usable_threads);
294 
295  auto thread_launcher = [&](auto const& type_tag) {
296  for (unsigned i = 0; i < static_cast<unsigned>(usable_threads); i++) {
297  row_processing_futures.emplace_back(
298  std::async(std::launch::async,
299  std::forward<decltype(process_rows)>(process_rows),
300  type_tag,
301  column_index,
302  get_row_index(i),
303  complete_row_block_size));
304  }
305  if (partial_row_block_size) {
306  row_processing_futures.emplace_back(
307  std::async(std::launch::async,
308  std::forward<decltype(process_rows)>(process_rows),
309  type_tag,
310  column_index,
311  get_row_index(usable_threads),
312  partial_row_block_size));
313  }
314  };
315 
316  if (!update_log.getColumnType(column_index).is_string()) {
317  thread_launcher(NonStringSelector());
318  } else {
319  thread_launcher(StringSelector());
320  }
321 
322  uint64_t rows_processed(0);
323  for (auto& t : row_processing_futures) {
324  t.wait();
325  rows_processed += t.get();
326  }
327 
328  IOFacility::updateColumn(catalog_,
329  update_log.getPhysicalTableId(),
330  update_parameters.getUpdateColumnNames()[column_index],
331  update_log.getFragmentId(),
332  column_offsets,
333  scalar_target_values,
334  update_log.getColumnType(column_index),
335  update_parameters.getTransactionTracker());
336  }
337  };
338  return callback;
339  }
340 }
341 
342 template <typename EXECUTOR_TRAITS, typename IO_FACET, typename FRAGMENT_UPDATER>
345  DeleteTransactionParameters& delete_parameters) {
346  using RowProcessingFuturesVector = std::vector<std::future<uint64_t>>;
347 
348  auto callback = [this,
349  &delete_parameters](FragmentUpdaterType const& update_log) -> void {
350  auto rows_per_column = update_log.getEntryCount();
351  if (rows_per_column == 0) {
352  return;
353  }
354  DeleteVictimOffsetList victim_offsets(rows_per_column);
355 
356  auto complete_row_block_size = rows_per_column / normalized_cpu_threads();
357  auto partial_row_block_size = rows_per_column % normalized_cpu_threads();
358  auto usable_threads = normalized_cpu_threads();
359 
360  if (UNLIKELY(rows_per_column < (unsigned)normalized_cpu_threads())) {
361  complete_row_block_size = rows_per_column;
362  partial_row_block_size = 0;
363  usable_threads = 1;
364  }
365 
366  auto process_rows = [&update_log, &victim_offsets](uint64_t row_start,
367  uint64_t row_count) -> uint64_t {
368  uint64_t rows_processed = 0;
369 
370  for (uint64_t row_index = row_start; row_index < (row_start + row_count);
371  row_index++, rows_processed++) {
372  auto const row(update_log.getEntryAt(row_index));
373  __builtin_prefetch(row.data(), 0, 0);
374 
375  CHECK(!row.empty());
376  auto terminal_column_iter = std::prev(row.end());
377  const auto scalar_tv = boost::get<ScalarTargetValue>(&*terminal_column_iter);
378  CHECK(scalar_tv);
379 
380  uint64_t fragment_offset =
381  static_cast<uint64_t>(*(boost::get<int64_t>(scalar_tv)));
382  victim_offsets[row_index] = fragment_offset;
383  }
384  return rows_processed;
385  };
386 
387  auto get_row_index = [complete_row_block_size](uint64_t thread_index) -> uint64_t {
388  return thread_index * complete_row_block_size;
389  };
390 
391  RowProcessingFuturesVector row_processing_futures;
392  row_processing_futures.reserve(usable_threads);
393 
394  for (unsigned i = 0; i < (unsigned)usable_threads; i++) {
395  row_processing_futures.emplace_back(
396  std::async(std::launch::async,
397  std::forward<decltype(process_rows)>(process_rows),
398  get_row_index(i),
399  complete_row_block_size));
400  }
401  if (partial_row_block_size) {
402  row_processing_futures.emplace_back(
403  std::async(std::launch::async,
404  std::forward<decltype(process_rows)>(process_rows),
405  get_row_index(usable_threads),
406  partial_row_block_size));
407  }
408 
409  uint64_t rows_processed(0);
410  for (auto& t : row_processing_futures) {
411  t.wait();
412  rows_processed += t.get();
413  }
414 
415  IOFacility::deleteColumns(catalog_,
416  update_log.getPhysicalTableId(),
417  update_log.getFragmentId(),
418  victim_offsets,
419  update_log.getColumnType(0),
420  delete_parameters.getTransactionTracker());
421  };
422  return callback;
423 }
424 
425 #endif
static std::function< bool(std::string const &)> yieldColumnValidator(CATALOG_TYPE const &cat, TABLE_DESCRIPTOR_TYPE const *table_descriptor)
typename IOFacility::DeleteVictimOffsetList DeleteVictimOffsetList
std::unique_ptr< TransactionLog > TransactionLogPtr
typename RelAlgExecutorTraits ::ExecutorType ExecutorType
UpdateTransactionParameters(TableDescriptorType const *table_desc, UpdateTargetColumnNamesList const &update_column_names, UpdateTargetTypeList const &target_types, bool varlen_update_required)
IOFacility::TransactionLog transaction_tracker_
#define LOG(tag)
Definition: Logger.h:182
typename IOFacility::UpdateTargetOffsetList UpdateTargetOffsetList
typename IOFacility::ColumnValidationFunction ColumnValidationFunction
static constexpr auto getEntryAt(NonStringSelector)
ExecutorType * executor_
std::function< void(const UpdateLogForFragment &)> Callback
Definition: Execute.h:315
IOFacility::TransactionLog & getTransactionTracker()
FRAGMENTER_TYPE FragmenterType
typename FragmentUpdaterType::Callback UpdateCallback
ColumnValidationFunction yieldColumnValidator(TableDescriptorType const *table_descriptor)
typename IOFacility::UpdateTargetColumnNamesList UpdateTargetColumnNamesList
int normalized_cpu_threads() const
#define UNLIKELY(x)
Definition: likely.h:20
UpdateCallback yieldUpdateCallback(UpdateTransactionParameters &update_parameters)
typename RelAlgExecutorTraits ::TableDescriptorType TableDescriptorType
UpdateCallback yieldDeleteCallback(DeleteTransactionParameters &delete_parameters)
StorageIOFacility(ExecutorType *executor, CatalogType const &catalog)
std::vector< uint64_t > UpdateTargetOffsetList
typename UpdateTargetColumnNamesList::value_type UpdateTargetColumnNameType
std::vector< uint64_t > DeleteVictimOffsetList
CatalogType const & catalog_
#define CHECK(condition)
Definition: Logger.h:187
static void deleteColumns(CATALOG_TYPE const &cat, TABLE_ID_TYPE const &&table_id, FRAGMENT_ID_TYPE const frag_id, VICTIM_OFFSET_LIST &victims, COLUMN_TYPE_INFO const &col_type_info, TransactionLog &transaction_tracker)
std::vector< TargetMetaInfo > UpdateTargetTypeList
static void updateColumn(CATALOG_TYPE const &cat, TABLE_ID_TYPE const &&table_id, COLUMN_NAME_TYPE const &column_name, FRAGMENT_ID_TYPE const frag_id, FRAGMENT_OFFSET_LIST_TYPE const &frag_offsets, UPDATE_VALUES_LIST_TYPE const &update_values, COLUMN_TYPE_INFO const &col_type_info, TransactionLog &transaction_tracker)
typename RelAlgExecutorTraits ::CatalogType CatalogType
int cpu_threads()
Definition: thread_count.h:23
std::vector< std::string > UpdateTargetColumnNamesList
typename FragmenterType::ModifyTransactionTracker TransactionLog
static constexpr auto getEntryAt(StringSelector)
std::function< bool(std::string const &)> ColumnValidationFunction
typename IOFacility::UpdateTargetTypeList UpdateTargetTypeList
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156