OmniSciDB  04ee39c94c
ExecuteUpdate.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnFetcher.h"
20 #include "Execute.h"
21 #include "RelAlgExecutor.h"
22 
24  size_t const fragment_index,
25  const std::shared_ptr<ResultSet>& rs)
26  : fragment_info_(fragment_info), fragment_index_(fragment_index), rs_(rs) {
27  rs->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
28 }
29 
30 std::vector<TargetValue> UpdateLogForFragment::getEntryAt(const size_t index) const {
31  return rs_->getRowAtNoTranslations(index);
32 }
33 
35  const size_t index) const {
36  return rs_->getRowAt(index);
37 }
38 
40  return getEntryCount();
41 }
42 
44  const {
45  return fragment_info_;
46 }
47 
49  return rs_->entryCount();
50 }
51 
53  return fragment_index_;
54 }
55 
56 SQLTypeInfo UpdateLogForFragment::getColumnType(const size_t col_idx) const {
57  return rs_->getColType(col_idx);
58 }
59 
60 void Executor::executeUpdate(const RelAlgExecutionUnit& ra_exe_unit_in,
61  const InputTableInfo& table_info,
62  const CompilationOptions& co,
63  const ExecutionOptions& eo,
64  const Catalog_Namespace::Catalog& cat,
65  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
67  CHECK(cb);
68  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in);
69  ColumnCacheMap column_cache;
70 
71  const auto count =
72  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
73  kCOUNT,
74  nullptr,
75  false,
76  nullptr);
77  const auto count_all_exe_unit = create_count_all_execution_unit(ra_exe_unit, count);
78 
79  std::vector<InputTableInfo> table_infos{table_info};
80  ExecutionDispatch execution_dispatch(
81  this, count_all_exe_unit, table_infos, cat, row_set_mem_owner, nullptr);
82  ColumnFetcher column_fetcher(this, column_cache);
83  const auto execution_descriptors =
84  execution_dispatch.compile(0, 8, co, eo, column_fetcher, false);
85  CHECK_EQ(size_t(1), ra_exe_unit.input_descs.size());
86  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
87  const auto& outer_fragments = table_info.info.fragments;
88  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
89  ++fragment_index) {
90  // We may want to consider in the future allowing this to execute on devices other
91  // than CPU
92  execution_dispatch.run(
93  co.device_type_,
94  0,
95  eo,
96  column_fetcher,
97  *std::get<QueryCompilationDescriptorOwned>(execution_descriptors),
98  *std::get<QueryMemoryDescriptorOwned>(execution_descriptors),
99  {{table_id, {fragment_index}}},
101  -1);
102  }
103  // Further optimization possible here to skip fragments
104  CHECK_EQ(outer_fragments.size(), execution_dispatch.getFragmentResults().size());
105  // There could be benefit to multithread this once we see where the bottle necks really
106  // are
107  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
108  ++fragment_index) {
109  const auto& fragment_results =
110  execution_dispatch.getFragmentResults()[fragment_index];
111  const auto count_result_set = fragment_results.first;
112  CHECK(count_result_set);
113  const auto count_row = count_result_set->getNextRow(false, false);
114  CHECK_EQ(size_t(1), count_row.size());
115  const auto& count_tv = count_row.front();
116  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
117  CHECK(count_scalar_tv);
118  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
119  CHECK(count_ptr);
120  ExecutionDispatch current_fragment_execution_dispatch(
121  this, ra_exe_unit, table_infos, cat, row_set_mem_owner, nullptr);
122  const auto execution_descriptors = current_fragment_execution_dispatch.compile(
123  *count_ptr, 8, co, eo, column_fetcher, false);
124  // We may want to consider in the future allowing this to execute on devices other
125  // than CPU
126  current_fragment_execution_dispatch.run(
127  co.device_type_,
128  0,
129  eo,
130  column_fetcher,
131  *std::get<QueryCompilationDescriptorOwned>(execution_descriptors),
132  *std::get<QueryMemoryDescriptorOwned>(execution_descriptors),
133  {FragmentsPerTable{table_id, {fragment_index}}},
135  -1);
136  const auto& proj_fragment_results =
137  current_fragment_execution_dispatch.getFragmentResults();
138  if (proj_fragment_results.empty()) {
139  continue;
140  }
141  const auto& proj_fragment_result = proj_fragment_results[0];
142  const auto proj_result_set = proj_fragment_result.first;
143  CHECK(proj_result_set);
144  cb({outer_fragments[fragment_index], fragment_index, proj_result_set});
145  }
146 }
SQLTypeInfo getColumnType(const size_t col_idx) const
#define CHECK_EQ(x, y)
Definition: Logger.h:195
Fragmenter_Namespace::TableInfo info
Definition: InputMetadata.h:35
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::tuple< QueryCompilationDescriptorOwned, QueryMemoryDescriptorOwned > compile(const size_t max_groups_buffer_entry_guess, const int8_t crt_min_byte_width, const CompilationOptions &co, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const bool has_cardinality_estimation)
FragmentInfoType const & getFragmentInfo() const
std::deque< FragmentInfo > fragments
Definition: Fragmenter.h:167
Container for compilation results and assorted options for a single execution unit.
std::function< void(const UpdateLogForFragment &)> Callback
Definition: Execute.h:315
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
void run(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_ids, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)
std::vector< TargetValue > getEntryAt(const size_t index) const override
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
bool g_bigint_count
size_t fragment_index_
Definition: Execute.h:319
size_t const getEntryCount() const
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:823
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > rs_
Definition: Execute.h:320
size_t count() const override
Definition: sqldefs.h:71
FragmentInfoType const & fragment_info_
Definition: Execute.h:318
#define CHECK(condition)
Definition: Logger.h:187
size_t const getFragmentIndex() const
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
Definition: sqltypes.h:47
UpdateLogForFragment(FragmentInfoType const &fragment_info, size_t const, const std::shared_ptr< ResultSet > &rs)
void executeUpdate(const RelAlgExecutionUnit &ra_exe_unit, const InputTableInfo &table_info, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const UpdateLogForFragment::Callback &cb) __attribute__((hot))
Descriptor for the fragments required for a query.