OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ExecuteUpdate.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ColumnFetcher.h"
20 #include "Execute.h"
21 #include "RelAlgExecutor.h"
22 
24  size_t const fragment_index,
25  const std::shared_ptr<ResultSet>& rs)
26  : fragment_info_(fragment_info), fragment_index_(fragment_index), rs_(rs) {
27  rs->setGeoReturnType(ResultSet::GeoReturnType::GeoTargetValue);
28 }
29 
30 std::vector<TargetValue> UpdateLogForFragment::getEntryAt(const size_t index) const {
31  return rs_->getRowAtNoTranslations(index);
32 }
33 
35  const size_t index) const {
36  return rs_->getRowAt(index);
37 }
38 
39 size_t const UpdateLogForFragment::getRowCount() const {
40  return rs_->rowCount();
41 }
42 
44  const {
45  return fragment_info_;
46 }
47 
49  return rs_->entryCount();
50 }
51 
53  return fragment_index_;
54 }
55 
56 SQLTypeInfo UpdateLogForFragment::getColumnType(const size_t col_idx) const {
57  return rs_->getColType(col_idx);
58 }
59 
60 void Executor::executeUpdate(const RelAlgExecutionUnit& ra_exe_unit_in,
61  const std::vector<InputTableInfo>& table_infos,
62  const CompilationOptions& co,
63  const ExecutionOptions& eo,
64  const Catalog_Namespace::Catalog& cat,
65  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
67  const bool is_agg) {
68  CHECK(cb);
69  const auto ra_exe_unit = addDeletedColumn(ra_exe_unit_in);
70  ColumnCacheMap column_cache;
71 
72  const auto count =
73  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
74  kCOUNT,
75  nullptr,
76  false,
77  nullptr);
78  const auto count_all_exe_unit = create_count_all_execution_unit(ra_exe_unit, count);
79 
80  ColumnFetcher column_fetcher(this, column_cache);
81  CHECK_GT(ra_exe_unit.input_descs.size(), size_t(0));
82  const auto table_id = ra_exe_unit.input_descs[0].getTableId();
83  const auto& outer_fragments = table_infos.front().info.fragments;
84 
85  std::vector<FragmentsPerTable> fragments = {{0, {0}}};
86  for (size_t tab_idx = 1; tab_idx < ra_exe_unit.input_descs.size(); tab_idx++) {
87  int table_id = ra_exe_unit.input_descs[tab_idx].getTableId();
88  CHECK_EQ(table_infos[tab_idx].table_id, table_id);
89  const auto& fragmentsPerTable = table_infos[tab_idx].info.fragments;
90  FragmentsPerTable entry = {table_id, {}};
91  for (size_t innerFragId = 0; innerFragId < fragmentsPerTable.size(); innerFragId++) {
92  entry.fragment_ids.push_back(innerFragId);
93  }
94  fragments.push_back(entry);
95  }
96 
97  // There could be benefit to multithread this once we see where the bottle necks really
98  // are
99  for (size_t fragment_index = 0; fragment_index < outer_fragments.size();
100  ++fragment_index) {
101  ExecutionDispatch current_fragment_execution_dispatch(
102  this, ra_exe_unit, table_infos, cat, row_set_mem_owner, nullptr);
103 
104  const int64_t crt_fragment_tuple_count =
105  outer_fragments[fragment_index].getNumTuples();
106  int64_t max_groups_buffer_entry_guess = crt_fragment_tuple_count;
107  if (is_agg) {
108  max_groups_buffer_entry_guess =
109  std::min(2 * max_groups_buffer_entry_guess, static_cast<int64_t>(100'000'000));
110  }
111 
112  const auto execution_descriptors = current_fragment_execution_dispatch.compile(
113  max_groups_buffer_entry_guess, 8, co, eo, column_fetcher, true);
114  // We may want to consider in the future allowing this to execute on devices other
115  // than CPU
116 
117  fragments[0] = {table_id, {fragment_index}};
118 
119  current_fragment_execution_dispatch.run(
120  co.device_type_,
121  0,
122  eo,
123  column_fetcher,
124  *std::get<QueryCompilationDescriptorOwned>(execution_descriptors),
125  *std::get<QueryMemoryDescriptorOwned>(execution_descriptors),
126  fragments,
128  -1);
129  const auto& proj_fragment_results =
130  current_fragment_execution_dispatch.getFragmentResults();
131  if (proj_fragment_results.empty()) {
132  continue;
133  }
134  const auto& proj_fragment_result = proj_fragment_results[0];
135  const auto proj_result_set = proj_fragment_result.first;
136  CHECK(proj_result_set);
137  cb({outer_fragments[fragment_index], fragment_index, proj_result_set});
138  }
139 }
SQLTypeInfo getColumnType(const size_t col_idx) const
std::vector< std::pair< ResultSetPtr, std::vector< size_t > > > & getFragmentResults()
bool is_agg(const Analyzer::Expr *expr)
#define CHECK_EQ(x, y)
Definition: Logger.h:198
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::tuple< QueryCompilationDescriptorOwned, QueryMemoryDescriptorOwned > compile(const size_t max_groups_buffer_entry_guess, const int8_t crt_min_byte_width, const CompilationOptions &co, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const bool has_cardinality_estimation)
size_t const getFragmentIndex() const
#define CHECK_GT(x, y)
Definition: Logger.h:202
Container for compilation results and assorted options for a single execution unit.
std::function< void(const UpdateLogForFragment &)> Callback
Definition: Execute.h:318
RelAlgExecutionUnit create_count_all_execution_unit(const RelAlgExecutionUnit &ra_exe_unit, std::shared_ptr< Analyzer::Expr > replacement_target)
void run(const ExecutorDeviceType chosen_device_type, int chosen_device_id, const ExecutionOptions &eo, const ColumnFetcher &column_fetcher, const QueryCompilationDescriptor &query_comp_desc, const QueryMemoryDescriptor &query_mem_desc, const FragmentsList &frag_ids, const ExecutorDispatchMode kernel_dispatch_mode, const int64_t rowid_lookup_key)
CHECK(cgen_state)
std::vector< TargetValue > getEntryAt(const size_t index) const override
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
FragmentInfoType const & getFragmentInfo() const
bool g_bigint_count
size_t fragment_index_
Definition: Execute.h:322
std::vector< TargetValue > getTranslatedEntryAt(const size_t index) const override
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:852
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > rs_
Definition: Execute.h:323
Definition: sqldefs.h:71
FragmentInfoType const & fragment_info_
Definition: Execute.h:321
void executeUpdate(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &table_infos, const CompilationOptions &co, const ExecutionOptions &eo, const Catalog_Namespace::Catalog &cat, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const UpdateLogForFragment::Callback &cb, const bool is_agg=false)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
Definition: sqltypes.h:48
std::vector< size_t > fragment_ids
UpdateLogForFragment(FragmentInfoType const &fragment_info, size_t const, const std::shared_ptr< ResultSet > &rs)
size_t const getRowCount() const override
Descriptor for the fragments required for a query.
size_t const getEntryCount() const override