OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ExecuteUpdate.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryEngine/Execute.h"
18 
24 
26  size_t const fragment_index,
27  const std::shared_ptr<ResultSet>& rs)
28  : fragment_info_(fragment_info), fragment_index_(fragment_index), rs_(rs) {
29  rs->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
30 }
31 
32 std::vector<TargetValue> UpdateLogForFragment::getEntryAt(const size_t index) const {
33  return rs_->getRowAtNoTranslations(index);
34 }
35 
37  const size_t index) const {
38  return rs_->getRowAt(index);
39 }
40 
41 size_t const UpdateLogForFragment::getRowCount() const {
42  return rs_->rowCount();
43 }
44 
46  const {
47  return fragment_info_;
48 }
49 
51  return rs_->entryCount();
52 }
53 
55  return fragment_index_;
56 }
57 
58 SQLTypeInfo UpdateLogForFragment::getColumnType(const size_t col_idx) const {
59  return rs_->getColType(col_idx);
60 }
61 
62 void Executor::executeUpdate(const RelAlgExecutionUnit& ra_exe_unit_in,
63  const std::vector<InputTableInfo>& table_infos,
64  const CompilationOptions& co,
65  const ExecutionOptions& eo,
67  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
69  const bool is_agg) {
70  CHECK(cb);
71  VLOG(1) << "Executing update/delete work unit:" << ra_exe_unit_in;
72 
73  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in, co);
74  ColumnCacheMap column_cache;
75 
76  ColumnFetcher column_fetcher(this, column_cache);
77  CHECK_GT(ra_exe_unit.input_descs.size(), size_t(0));
78  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
79  const auto& outer_fragments = table_infos.front().info.fragments;
80 
81  std::vector<FragmentsPerTable> fragments = {{0, {0}}};
82  for (size_t tab_idx = 1; tab_idx < ra_exe_unit.input_descs.size(); tab_idx++) {
83  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
84  CHECK_EQ(table_infos[tab_idx].table_id, table_id);
85  const auto& fragmentsPerTable = table_infos[tab_idx].info.fragments;
86  FragmentsPerTable entry = {table_id, {}};
87  for (size_t innerFragId = 0; innerFragId < fragmentsPerTable.size(); innerFragId++) {
88  entry.fragment_ids.push_back(innerFragId);
89  }
90  fragments.push_back(entry);
91  }
92 
93  if (outer_fragments.empty()) {
94  return;
95  }
96 
97  const auto max_tuple_count_fragment_it = std::max_element(
98  outer_fragments.begin(), outer_fragments.end(), [](const auto& a, const auto& b) {
99  return a.getNumTuples() < b.getNumTuples();
100  });
101  CHECK(max_tuple_count_fragment_it != outer_fragments.end());
102  int64_t global_max_groups_buffer_entry_guess =
103  max_tuple_count_fragment_it->getNumTuples();
104  if (is_agg) {
105  global_max_groups_buffer_entry_guess = std::min(
106  2 * global_max_groups_buffer_entry_guess, static_cast<int64_t>(100'000'000));
107  }
108 
109  auto query_comp_desc = std::make_unique<QueryCompilationDescriptor>();
110  std::unique_ptr<QueryMemoryDescriptor> query_mem_desc;
111  {
112  auto clock_begin = timer_start();
113  std::lock_guard<std::mutex> compilation_lock(compilation_mutex_);
114  compilation_queue_time_ms_ += timer_stop(clock_begin);
115 
116  query_mem_desc = query_comp_desc->compile(global_max_groups_buffer_entry_guess,
117  8,
118  /*has_cardinality_estimation=*/true,
119  ra_exe_unit,
120  table_infos,
121  column_fetcher,
122  co,
123  eo,
124  nullptr,
125  this);
126  }
127  CHECK(query_mem_desc);
128 
129  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
130  ++fragment_index) {
131  const int64_t crt_fragment_tuple_count =
132  outer_fragments[fragment_index].getNumTuples();
133  if (crt_fragment_tuple_count == 0) {
134  // nothing to update
135  continue;
136  }
137 
138  SharedKernelContext shared_context(table_infos);
139  fragments[0] = {table_id, {fragment_index}};
140  {
141  ExecutionKernel current_fragment_kernel(ra_exe_unit,
143  0,
144  eo,
145  column_fetcher,
146  *query_comp_desc,
147  *query_mem_desc,
148  fragments,
150  /*render_info=*/nullptr,
151  /*rowid_lookup_key=*/-1,
153 
154  auto clock_begin = timer_start();
155  std::lock_guard<std::mutex> kernel_lock(kernel_mutex_);
156  kernel_queue_time_ms_ += timer_stop(clock_begin);
157 
158  current_fragment_kernel.run(this, shared_context);
159  }
160  const auto& proj_fragment_results = shared_context.getFragmentResults();
161  if (proj_fragment_results.empty()) {
162  continue;
163  }
164  const auto& proj_fragment_result = proj_fragment_results[0];
165  const auto proj_result_set = proj_fragment_result.first;
166  CHECK(proj_result_set);
167  cb({outer_fragments[fragment_index], fragment_index, proj_result_set});
168  }
169 }
SQLTypeInfo getColumnType(const size_t col_idx) const
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::string cat(Ts &&...args)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
size_t const getFragmentIndex() const
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
#define CHECK_GT(x, y)
Definition: Logger.h:209
Container for compilation results and assorted options for a single execution unit.
std::function< void(const UpdateLogForFragment &)> Callback
Definition: Execute.h:279
void run(Executor *executor, SharedKernelContext &shared_context)
CHECK(cgen_state)
std::vector< TargetValue > getEntryAt(const size_t index) const override
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
FragmentInfoType const & getFragmentInfo() const
size_t fragment_index_
Definition: Execute.h:285
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::shared_ptr< ResultSet > rs_
Definition: Execute.h:286
FragmentInfoType const & fragment_info_
Definition: Execute.h:284
ThreadId thread_id()
Definition: Logger.cpp:715
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
std::vector< size_t > fragment_ids
UpdateLogForFragment(FragmentInfoType const &fragment_info, size_t const, const std::shared_ptr< ResultSet > &rs)
size_t const getRowCount() const override
Descriptor for the fragments required for an execution kernel.
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40
size_t const getEntryCount() const override
void executeUpdate(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const UpdateLogForFragment::Callback &cb, const bool is_agg)