OmniSciDB  95562058bd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
InputMetadata.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "InputMetadata.h"
18 #include "Execute.h"
19 
20 #include "../Fragmenter/Fragmenter.h"
21 
22 #include <future>
23 
24 InputTableInfoCache::InputTableInfoCache(Executor* executor) : executor_(executor) {}
25 
26 namespace {
27 
29  const Fragmenter_Namespace::TableInfo& table_info) {
30  Fragmenter_Namespace::TableInfo table_info_copy;
31  table_info_copy.chunkKeyPrefix = table_info.chunkKeyPrefix;
32  table_info_copy.fragments = table_info.fragments;
33  table_info_copy.setPhysicalNumTuples(table_info.getPhysicalNumTuples());
34  return table_info_copy;
35 }
36 
38  const std::vector<const TableDescriptor*>& shard_tables) {
39  size_t total_number_of_tuples{0};
40  Fragmenter_Namespace::TableInfo table_info_all_shards;
41  for (const TableDescriptor* shard_table : shard_tables) {
42  CHECK(shard_table->fragmenter);
43  const auto& shard_metainfo = shard_table->fragmenter->getFragmentsForQuery();
44  total_number_of_tuples += shard_metainfo.getPhysicalNumTuples();
45  table_info_all_shards.fragments.reserve(table_info_all_shards.fragments.size() +
46  shard_metainfo.fragments.size());
47  table_info_all_shards.fragments.insert(table_info_all_shards.fragments.end(),
48  shard_metainfo.fragments.begin(),
49  shard_metainfo.fragments.end());
50  }
51  table_info_all_shards.setPhysicalNumTuples(total_number_of_tuples);
52  return table_info_all_shards;
53 }
54 
55 } // namespace
56 
58  const auto it = cache_.find(table_id);
59  if (it != cache_.end()) {
60  const auto& table_info = it->second;
61  return copy_table_info(table_info);
62  }
63  const auto cat = executor_->getCatalog();
64  CHECK(cat);
65  const auto td = cat->getMetadataForTable(table_id);
66  CHECK(td);
67  const auto shard_tables = cat->getPhysicalTablesDescriptors(td);
68  auto table_info = build_table_info(shard_tables);
69  auto it_ok = cache_.emplace(table_id, copy_table_info(table_info));
70  CHECK(it_ok.second);
71  return copy_table_info(table_info);
72 }
73 
75  decltype(cache_)().swap(cache_);
76 }
77 
78 namespace {
79 
80 bool uses_int_meta(const SQLTypeInfo& col_ti) {
81  return col_ti.is_integer() || col_ti.is_decimal() || col_ti.is_time() ||
82  col_ti.is_boolean() ||
83  (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT);
84 }
85 
87  std::vector<Fragmenter_Namespace::FragmentInfo> result;
88  if (rows) {
89  result.resize(1);
90  auto& fragment = result.front();
91  fragment.fragmentId = 0;
92  fragment.deviceIds.resize(3);
93  fragment.resultSet = rows.get();
94  fragment.resultSetMutex.reset(new std::mutex());
95  }
97  table_info.fragments = result;
98  return table_info;
99 }
100 
101 void collect_table_infos(std::vector<InputTableInfo>& table_infos,
102  const std::vector<InputDescriptor>& input_descs,
103  Executor* executor) {
104  const auto temporary_tables = executor->getTemporaryTables();
105  const auto cat = executor->getCatalog();
106  CHECK(cat);
107  std::unordered_map<int, size_t> info_cache;
108  for (const auto& input_desc : input_descs) {
109  const auto table_id = input_desc.getTableId();
110  const auto cached_index_it = info_cache.find(table_id);
111  if (cached_index_it != info_cache.end()) {
112  CHECK_LT(cached_index_it->second, table_infos.size());
113  table_infos.push_back(
114  {table_id, copy_table_info(table_infos[cached_index_it->second].info)});
115  continue;
116  }
117  if (input_desc.getSourceType() == InputSourceType::RESULT) {
118  CHECK_LT(table_id, 0);
119  CHECK(temporary_tables);
120  const auto it = temporary_tables->find(table_id);
121  LOG_IF(FATAL, it == temporary_tables->end())
122  << "Failed to find previous query result for node " << -table_id;
123  table_infos.push_back({table_id, synthesize_table_info(it->second)});
124  } else {
125  CHECK(input_desc.getSourceType() == InputSourceType::TABLE);
126  table_infos.push_back({table_id, executor->getTableInfo(table_id)});
127  }
128  CHECK(!table_infos.empty());
129  info_cache.insert(std::make_pair(table_id, table_infos.size() - 1));
130  }
131 }
132 
133 } // namespace
134 
135 ChunkMetadataMap synthesize_metadata(const ResultSet* rows) {
136  rows->moveToBegin();
137  std::vector<std::vector<std::unique_ptr<Encoder>>> dummy_encoders;
138  const size_t worker_count = use_parallel_algorithms(*rows) ? cpu_threads() : 1;
139  for (size_t worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
140  dummy_encoders.emplace_back();
141  for (size_t i = 0; i < rows->colCount(); ++i) {
142  const auto& col_ti = rows->getColType(i);
143  dummy_encoders.back().emplace_back(Encoder::Create(nullptr, col_ti));
144  }
145  }
146  const auto do_work = [rows](const std::vector<TargetValue>& crt_row,
147  std::vector<std::unique_ptr<Encoder>>& dummy_encoders) {
148  for (size_t i = 0; i < rows->colCount(); ++i) {
149  const auto& col_ti = rows->getColType(i);
150  const auto& col_val = crt_row[i];
151  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
152  CHECK(scalar_col_val);
153  if (uses_int_meta(col_ti)) {
154  const auto i64_p = boost::get<int64_t>(scalar_col_val);
155  CHECK(i64_p);
156  dummy_encoders[i]->updateStats(*i64_p, *i64_p == inline_int_null_val(col_ti));
157  } else if (col_ti.is_fp()) {
158  switch (col_ti.get_type()) {
159  case kFLOAT: {
160  const auto float_p = boost::get<float>(scalar_col_val);
161  CHECK(float_p);
162  dummy_encoders[i]->updateStats(*float_p,
163  *float_p == inline_fp_null_val(col_ti));
164  break;
165  }
166  case kDOUBLE: {
167  const auto double_p = boost::get<double>(scalar_col_val);
168  CHECK(double_p);
169  dummy_encoders[i]->updateStats(*double_p,
170  *double_p == inline_fp_null_val(col_ti));
171  break;
172  }
173  default:
174  CHECK(false);
175  }
176  } else {
177  throw std::runtime_error(col_ti.get_type_name() +
178  " is not supported in temporary table.");
179  }
180  }
181  };
182  if (use_parallel_algorithms(*rows)) {
183  const size_t worker_count = cpu_threads();
184  std::vector<std::future<void>> compute_stats_threads;
185  const auto entry_count = rows->entryCount();
186  for (size_t i = 0,
187  start_entry = 0,
188  stride = (entry_count + worker_count - 1) / worker_count;
189  i < worker_count && start_entry < entry_count;
190  ++i, start_entry += stride) {
191  const auto end_entry = std::min(start_entry + stride, entry_count);
192  compute_stats_threads.push_back(std::async(
193  std::launch::async,
194  [rows, &do_work, &dummy_encoders](
195  const size_t start, const size_t end, const size_t worker_idx) {
196  for (size_t i = start; i < end; ++i) {
197  const auto crt_row = rows->getRowAtNoTranslations(i);
198  if (!crt_row.empty()) {
199  do_work(crt_row, dummy_encoders[worker_idx]);
200  }
201  }
202  },
203  start_entry,
204  end_entry,
205  i));
206  }
207  for (auto& child : compute_stats_threads) {
208  child.wait();
209  }
210  for (auto& child : compute_stats_threads) {
211  child.get();
212  }
213  } else {
214  while (true) {
215  auto crt_row = rows->getNextRow(false, false);
216  if (crt_row.empty()) {
217  break;
218  }
219  do_work(crt_row, dummy_encoders[0]);
220  }
221  rows->moveToBegin();
222  }
223  ChunkMetadataMap metadata_map;
224  for (size_t worker_idx = 1; worker_idx < worker_count; ++worker_idx) {
225  CHECK_LT(worker_idx, dummy_encoders.size());
226  const auto& worker_encoders = dummy_encoders[worker_idx];
227  for (size_t i = 0; i < rows->colCount(); ++i) {
228  dummy_encoders[0][i]->reduceStats(*worker_encoders[i]);
229  }
230  }
231  for (size_t i = 0; i < rows->colCount(); ++i) {
232  const auto it_ok =
233  metadata_map.emplace(i, dummy_encoders[0][i]->getMetadata(rows->getColType(i)));
234  CHECK(it_ok.second);
235  }
236  return metadata_map;
237 }
238 
239 size_t get_frag_count_of_table(const int table_id, Executor* executor) {
240  const auto temporary_tables = executor->getTemporaryTables();
241  CHECK(temporary_tables);
242  auto it = temporary_tables->find(table_id);
243  if (it != temporary_tables->end()) {
244  CHECK_GE(int(0), table_id);
245  return size_t(1);
246  } else {
247  const auto table_info = executor->getTableInfo(table_id);
248  return table_info.fragments.size();
249  }
250 }
251 
252 std::vector<InputTableInfo> get_table_infos(
253  const std::vector<InputDescriptor>& input_descs,
254  Executor* executor) {
255  std::vector<InputTableInfo> table_infos;
256  collect_table_infos(table_infos, input_descs, executor);
257  return table_infos;
258 }
259 
260 std::vector<InputTableInfo> get_table_infos(const RelAlgExecutionUnit& ra_exe_unit,
261  Executor* executor) {
263  std::vector<InputTableInfo> table_infos;
264  collect_table_infos(table_infos, ra_exe_unit.input_descs, executor);
265  return table_infos;
266 }
267 
272  }
273  return chunkMetadataMap;
274 }
275 
277  std::unique_ptr<std::lock_guard<std::mutex>> lock;
278  if (resultSetMutex) {
279  lock.reset(new std::lock_guard<std::mutex>(*resultSetMutex));
280  }
281  CHECK_EQ(!!resultSet, !!resultSetMutex);
282  if (resultSet && !synthesizedNumTuplesIsValid) {
283  numTuples = resultSet->rowCount();
284  synthesizedNumTuplesIsValid = true;
285  }
286  return numTuples;
287 }
288 
290  if (!fragments.empty() && fragments.front().resultSet) {
291  return fragments.front().getNumTuples();
292  }
293  return numTuples;
294 }
295 
297  if (!fragments.empty() && fragments.front().resultSet) {
298  return fragments.front().resultSet->entryCount();
299  }
300  return numTuples;
301 }
302 
304  if (!fragments.empty() && fragments.front().resultSet) {
305  return fragments.front().resultSet->entryCount();
306  }
307  size_t fragment_num_tupples_upper_bound = 0;
308  for (const auto& fragment : fragments) {
309  fragment_num_tupples_upper_bound =
310  std::max(fragment.getNumTuples(), fragment_num_tupples_upper_bound);
311  }
312  return fragment_num_tupples_upper_bound;
313 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::string cat(Ts &&...args)
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1007
Fragmenter_Namespace::TableInfo copy_table_info(const Fragmenter_Namespace::TableInfo &table_info)
ChunkMetadataMap synthesize_metadata(const ResultSet *rows)
static Encoder * Create(Data_Namespace::AbstractBuffer *buffer, const SQLTypeInfo sqlType)
Definition: Encoder.cpp:26
Executor * executor_
Definition: InputMetadata.h:48
std::vector< InputDescriptor > input_descs
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::shared_ptr< ResultSet > ResultSetPtr
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:161
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:160
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:423
#define LOG_IF(severity, condition)
Definition: Logger.h:287
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:154
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id)
bool uses_int_meta(const SQLTypeInfo &col_ti)
bool is_integer() const
Definition: sqltypes.h:419
#define INJECT_TIMER(DESC)
Definition: measure.h:93
size_t getFragmentNumTuplesUpperBound() const
Fragmenter_Namespace::TableInfo synthesize_table_info(const ResultSetPtr &rows)
const ChunkMetadataMap & getChunkMetadataMap() const
bool is_boolean() const
Definition: sqltypes.h:424
#define CHECK_LT(x, y)
Definition: Logger.h:207
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
InputTableInfoCache(Executor *executor)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:156
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
void collect_table_infos(std::vector< InputTableInfo > &table_infos, const std::vector< InputDescriptor > &input_descs, Executor *executor)
bool is_string() const
Definition: sqltypes.h:417
specifies the content in-memory of a row in the table metadata table
std::unordered_map< int, Fragmenter_Namespace::TableInfo > cache_
Definition: InputMetadata.h:47
size_t get_frag_count_of_table(const int table_id, Executor *executor)
int cpu_threads()
Definition: thread_count.h:24
Fragmenter_Namespace::TableInfo build_table_info(const std::vector< const TableDescriptor * > &shard_tables)
bool is_decimal() const
Definition: sqltypes.h:420