OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InputMetadata.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "InputMetadata.h"
18 #include "Execute.h"
19 
20 #include "../Fragmenter/Fragmenter.h"
21 
22 #include <tbb/parallel_for.h>
23 #include <tbb/task_arena.h>
24 #include <tbb/task_group.h>
25 #include <future>
26 
27 InputTableInfoCache::InputTableInfoCache(Executor* executor) : executor_(executor) {}
28 
29 namespace {
30 
32  const Fragmenter_Namespace::TableInfo& table_info) {
33  Fragmenter_Namespace::TableInfo table_info_copy;
34  table_info_copy.chunkKeyPrefix = table_info.chunkKeyPrefix;
35  table_info_copy.fragments = table_info.fragments;
36  table_info_copy.setPhysicalNumTuples(table_info.getPhysicalNumTuples());
37  return table_info_copy;
38 }
39 
40 } // namespace
41 
43  const std::vector<const TableDescriptor*>& shard_tables) {
44  size_t total_number_of_tuples{0};
45  Fragmenter_Namespace::TableInfo table_info_all_shards;
46  for (const TableDescriptor* shard_table : shard_tables) {
47  CHECK(shard_table->fragmenter);
48  const auto& shard_metainfo = shard_table->fragmenter->getFragmentsForQuery();
49  total_number_of_tuples += shard_metainfo.getPhysicalNumTuples();
50  table_info_all_shards.fragments.reserve(table_info_all_shards.fragments.size() +
51  shard_metainfo.fragments.size());
52  table_info_all_shards.fragments.insert(table_info_all_shards.fragments.end(),
53  shard_metainfo.fragments.begin(),
54  shard_metainfo.fragments.end());
55  }
56  table_info_all_shards.setPhysicalNumTuples(total_number_of_tuples);
57  return table_info_all_shards;
58 }
59 
61  const auto it = cache_.find(table_id);
62  if (it != cache_.end()) {
63  const auto& table_info = it->second;
64  return copy_table_info(table_info);
65  }
66  const auto cat = executor_->getCatalog();
67  CHECK(cat);
68  const auto td = cat->getMetadataForTable(table_id);
69  CHECK(td);
70  const auto shard_tables = cat->getPhysicalTablesDescriptors(td);
71  auto table_info = build_table_info(shard_tables);
72  auto it_ok = cache_.emplace(table_id, copy_table_info(table_info));
73  CHECK(it_ok.second);
74  return copy_table_info(table_info);
75 }
76 
78  decltype(cache_)().swap(cache_);
79 }
80 
81 namespace {
82 
83 bool uses_int_meta(const SQLTypeInfo& col_ti) {
84  return col_ti.is_integer() || col_ti.is_decimal() || col_ti.is_time() ||
85  col_ti.is_boolean() ||
86  (col_ti.is_string() && col_ti.get_compression() == kENCODING_DICT);
87 }
88 
90  std::vector<Fragmenter_Namespace::FragmentInfo> result;
91  if (rows) {
92  result.resize(1);
93  auto& fragment = result.front();
94  fragment.fragmentId = 0;
95  fragment.deviceIds.resize(3);
96  fragment.resultSet = rows.get();
97  fragment.resultSetMutex.reset(new std::mutex());
98  }
100  table_info.fragments = result;
101  return table_info;
102 }
103 
104 void collect_table_infos(std::vector<InputTableInfo>& table_infos,
105  const std::vector<InputDescriptor>& input_descs,
106  Executor* executor) {
107  const auto temporary_tables = executor->getTemporaryTables();
108  const auto cat = executor->getCatalog();
109  CHECK(cat);
110  std::unordered_map<int, size_t> info_cache;
111  for (const auto& input_desc : input_descs) {
112  const auto table_id = input_desc.getTableId();
113  const auto cached_index_it = info_cache.find(table_id);
114  if (cached_index_it != info_cache.end()) {
115  CHECK_LT(cached_index_it->second, table_infos.size());
116  table_infos.push_back(
117  {table_id, copy_table_info(table_infos[cached_index_it->second].info)});
118  continue;
119  }
120  if (input_desc.getSourceType() == InputSourceType::RESULT) {
121  CHECK_LT(table_id, 0);
122  CHECK(temporary_tables);
123  const auto it = temporary_tables->find(table_id);
124  LOG_IF(FATAL, it == temporary_tables->end())
125  << "Failed to find previous query result for node " << -table_id;
126  table_infos.push_back({table_id, synthesize_table_info(it->second)});
127  } else {
128  CHECK(input_desc.getSourceType() == InputSourceType::TABLE);
129  table_infos.push_back({table_id, executor->getTableInfo(table_id)});
130  }
131  CHECK(!table_infos.empty());
132  info_cache.insert(std::make_pair(table_id, table_infos.size() - 1));
133  }
134 }
135 
136 } // namespace
137 
138 template <typename T>
140  std::shared_ptr<ChunkMetadata>& chunk_metadata,
141  const T* col_buffer,
142  const T null_val) {
143  const size_t row_count = chunk_metadata->numElements;
144  T min_val{std::numeric_limits<T>::max()};
145  T max_val{std::numeric_limits<T>::lowest()};
146  bool has_nulls{false};
147  constexpr size_t parallel_stats_compute_threshold = 20000UL;
148  if (row_count < parallel_stats_compute_threshold) {
149  for (size_t row_idx = 0; row_idx < row_count; ++row_idx) {
150  const T cell_val = col_buffer[row_idx];
151  if (cell_val == null_val) {
152  has_nulls = true;
153  continue;
154  }
155  if (cell_val < min_val) {
156  min_val = cell_val;
157  }
158  if (cell_val > max_val) {
159  max_val = cell_val;
160  }
161  }
162  } else {
163  const size_t max_thread_count = std::thread::hardware_concurrency();
164  const size_t max_inputs_per_thread = 20000;
165  const size_t min_grain_size = max_inputs_per_thread / 2;
166  const size_t num_threads =
167  std::min(max_thread_count,
168  ((row_count + max_inputs_per_thread - 1) / max_inputs_per_thread));
169 
170  std::vector<T> threads_local_mins(num_threads, std::numeric_limits<T>::max());
171  std::vector<T> threads_local_maxes(num_threads, std::numeric_limits<T>::lowest());
172  std::vector<bool> threads_local_has_nulls(num_threads, false);
173  tbb::task_arena limited_arena(num_threads);
174  tbb::task_group tg;
175 
176  limited_arena.execute([&] {
177  tg.run([&] {
179  tbb::blocked_range<size_t>(0, row_count, min_grain_size),
180  [&](const tbb::blocked_range<size_t>& r) {
181  const size_t start_idx = r.begin();
182  const size_t end_idx = r.end();
183  T local_min_val = std::numeric_limits<T>::max();
184  T local_max_val = std::numeric_limits<T>::lowest();
185  bool local_has_nulls = false;
186  for (size_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
187  const T cell_val = col_buffer[row_idx];
188  if (cell_val == null_val) {
189  local_has_nulls = true;
190  continue;
191  }
192  if (cell_val < local_min_val) {
193  local_min_val = cell_val;
194  }
195  if (cell_val > local_max_val) {
196  local_max_val = cell_val;
197  }
198  }
199  size_t thread_idx = tbb::this_task_arena::current_thread_index();
200  if (local_min_val < threads_local_mins[thread_idx]) {
201  threads_local_mins[thread_idx] = local_min_val;
202  }
203  if (local_max_val > threads_local_maxes[thread_idx]) {
204  threads_local_maxes[thread_idx] = local_max_val;
205  }
206  if (local_has_nulls) {
207  threads_local_has_nulls[thread_idx] = true;
208  }
209  },
210  tbb::simple_partitioner());
211  });
212  });
213 
214  limited_arena.execute([&] { tg.wait(); });
215  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
216  if (threads_local_mins[thread_idx] < min_val) {
217  min_val = threads_local_mins[thread_idx];
218  }
219  if (threads_local_maxes[thread_idx] > max_val) {
220  max_val = threads_local_maxes[thread_idx];
221  }
222  has_nulls |= threads_local_has_nulls[thread_idx];
223  }
224  }
225  chunk_metadata->fillChunkStats(min_val, max_val, has_nulls);
226 }
227 
229  CHECK(rows->getQueryMemDesc().getQueryDescriptionType() ==
231  CHECK(rows->didOutputColumnar());
232  CHECK(!(rows->areAnyColumnsLazyFetched()));
233  const size_t col_count = rows->colCount();
234  const auto row_count = rows->entryCount();
235 
236  ChunkMetadataMap chunk_metadata_map;
237 
238  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
239  const int8_t* columnar_buffer = const_cast<int8_t*>(rows->getColumnarBuffer(col_idx));
240  const auto col_sql_type_info = rows->getColType(col_idx);
241  const auto col_type = col_sql_type_info.get_type();
242  if (col_type != kTEXT) {
243  CHECK(col_sql_type_info.get_compression() == kENCODING_NONE);
244  } else {
245  CHECK(col_sql_type_info.get_compression() == kENCODING_DICT);
246  CHECK_EQ(col_sql_type_info.get_size(), sizeof(int32_t));
247  }
248  std::shared_ptr<ChunkMetadata> chunk_metadata = std::make_shared<ChunkMetadata>();
249  chunk_metadata->sqlType = col_sql_type_info;
250  chunk_metadata->numBytes = row_count * col_sql_type_info.get_size();
251  chunk_metadata->numElements = row_count;
252 
253  switch (col_sql_type_info.get_type()) {
254  case kBOOLEAN:
255  case kTINYINT:
257  chunk_metadata,
258  columnar_buffer,
259  static_cast<int8_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
260  break;
261  case kSMALLINT:
263  chunk_metadata,
264  reinterpret_cast<const int16_t*>(columnar_buffer),
265  static_cast<int16_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
266  break;
267  case kINT:
269  chunk_metadata,
270  reinterpret_cast<const int32_t*>(columnar_buffer),
271  static_cast<int32_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
272  break;
273  case kBIGINT:
275  chunk_metadata,
276  reinterpret_cast<const int64_t*>(columnar_buffer),
277  static_cast<int64_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
278  break;
279  case kFLOAT:
280  // For float use the typed null accessor as the generic one converts to double,
281  // and do not want to risk loss of precision
283  chunk_metadata,
284  reinterpret_cast<const float*>(columnar_buffer),
286  break;
287  case kDOUBLE:
289  chunk_metadata,
290  reinterpret_cast<const double*>(columnar_buffer),
292  break;
293  case kTEXT:
295  chunk_metadata,
296  reinterpret_cast<const int32_t*>(columnar_buffer),
297  static_cast<int32_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
298  break;
299  default:
300  UNREACHABLE();
301  }
302  chunk_metadata_map.emplace(col_idx, chunk_metadata);
303  }
304  return chunk_metadata_map;
305 }
306 
307 ChunkMetadataMap synthesize_metadata(const ResultSet* rows) {
308  auto timer = DEBUG_TIMER(__func__);
309  rows->moveToBegin();
310  if (rows->getQueryMemDesc().getQueryDescriptionType() ==
313  }
314  std::vector<std::vector<std::unique_ptr<Encoder>>> dummy_encoders;
315  const size_t worker_count =
317  for (size_t worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
318  dummy_encoders.emplace_back();
319  for (size_t i = 0; i < rows->colCount(); ++i) {
320  const auto& col_ti = rows->getColType(i);
321  dummy_encoders.back().emplace_back(Encoder::Create(nullptr, col_ti));
322  }
323  }
324  const auto do_work = [rows](const std::vector<TargetValue>& crt_row,
325  std::vector<std::unique_ptr<Encoder>>& dummy_encoders) {
326  for (size_t i = 0; i < rows->colCount(); ++i) {
327  const auto& col_ti = rows->getColType(i);
328  const auto& col_val = crt_row[i];
329  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
330  CHECK(scalar_col_val);
331  if (uses_int_meta(col_ti)) {
332  const auto i64_p = boost::get<int64_t>(scalar_col_val);
333  CHECK(i64_p);
334  dummy_encoders[i]->updateStats(*i64_p, *i64_p == inline_int_null_val(col_ti));
335  } else if (col_ti.is_fp()) {
336  switch (col_ti.get_type()) {
337  case kFLOAT: {
338  const auto float_p = boost::get<float>(scalar_col_val);
339  CHECK(float_p);
340  dummy_encoders[i]->updateStats(*float_p,
341  *float_p == inline_fp_null_val(col_ti));
342  break;
343  }
344  case kDOUBLE: {
345  const auto double_p = boost::get<double>(scalar_col_val);
346  CHECK(double_p);
347  dummy_encoders[i]->updateStats(*double_p,
348  *double_p == inline_fp_null_val(col_ti));
349  break;
350  }
351  default:
352  CHECK(false);
353  }
354  } else {
355  throw std::runtime_error(col_ti.get_type_name() +
356  " is not supported in temporary table.");
357  }
358  }
359  };
361  const size_t worker_count = cpu_threads();
362  std::vector<std::future<void>> compute_stats_threads;
363  const auto entry_count = rows->entryCount();
364  for (size_t i = 0,
365  start_entry = 0,
366  stride = (entry_count + worker_count - 1) / worker_count;
367  i < worker_count && start_entry < entry_count;
368  ++i, start_entry += stride) {
369  const auto end_entry = std::min(start_entry + stride, entry_count);
370  compute_stats_threads.push_back(std::async(
372  [rows, &do_work, &dummy_encoders](
373  const size_t start, const size_t end, const size_t worker_idx) {
374  for (size_t i = start; i < end; ++i) {
375  const auto crt_row = rows->getRowAtNoTranslations(i);
376  if (!crt_row.empty()) {
377  do_work(crt_row, dummy_encoders[worker_idx]);
378  }
379  }
380  },
381  start_entry,
382  end_entry,
383  i));
384  }
385  for (auto& child : compute_stats_threads) {
386  child.wait();
387  }
388  for (auto& child : compute_stats_threads) {
389  child.get();
390  }
391  } else {
392  while (true) {
393  auto crt_row = rows->getNextRow(false, false);
394  if (crt_row.empty()) {
395  break;
396  }
397  do_work(crt_row, dummy_encoders[0]);
398  }
399  rows->moveToBegin();
400  }
401  ChunkMetadataMap metadata_map;
402  for (size_t worker_idx = 1; worker_idx < worker_count; ++worker_idx) {
403  CHECK_LT(worker_idx, dummy_encoders.size());
404  const auto& worker_encoders = dummy_encoders[worker_idx];
405  for (size_t i = 0; i < rows->colCount(); ++i) {
406  dummy_encoders[0][i]->reduceStats(*worker_encoders[i]);
407  }
408  }
409  for (size_t i = 0; i < rows->colCount(); ++i) {
410  const auto it_ok =
411  metadata_map.emplace(i, dummy_encoders[0][i]->getMetadata(rows->getColType(i)));
412  CHECK(it_ok.second);
413  }
414  return metadata_map;
415 }
416 
417 size_t get_frag_count_of_table(const int table_id, Executor* executor) {
418  const auto temporary_tables = executor->getTemporaryTables();
419  CHECK(temporary_tables);
420  auto it = temporary_tables->find(table_id);
421  if (it != temporary_tables->end()) {
422  CHECK_GE(int(0), table_id);
423  return size_t(1);
424  } else {
425  const auto table_info = executor->getTableInfo(table_id);
426  return table_info.fragments.size();
427  }
428 }
429 
430 std::vector<InputTableInfo> get_table_infos(
431  const std::vector<InputDescriptor>& input_descs,
432  Executor* executor) {
433  std::vector<InputTableInfo> table_infos;
434  collect_table_infos(table_infos, input_descs, executor);
435  return table_infos;
436 }
437 
438 std::vector<InputTableInfo> get_table_infos(const RelAlgExecutionUnit& ra_exe_unit,
439  Executor* executor) {
441  std::vector<InputTableInfo> table_infos;
442  collect_table_infos(table_infos, ra_exe_unit.input_descs, executor);
443  return table_infos;
444 }
445 
450  }
451  return chunkMetadataMap;
452 }
453 
455  const {
456  ChunkMetadataMap metadata_map;
457  for (const auto& [column_id, chunk_metadata] : chunkMetadataMap) {
458  metadata_map[column_id] = std::make_shared<ChunkMetadata>(*chunk_metadata);
459  }
460  return metadata_map;
461 }
462 
464  std::unique_ptr<std::lock_guard<std::mutex>> lock;
465  if (resultSetMutex) {
466  lock.reset(new std::lock_guard<std::mutex>(*resultSetMutex));
467  }
468  CHECK_EQ(!!resultSet, !!resultSetMutex);
469  if (resultSet && !synthesizedNumTuplesIsValid) {
470  numTuples = resultSet->rowCount();
471  synthesizedNumTuplesIsValid = true;
472  }
473  return numTuples;
474 }
475 
477  if (!fragments.empty() && fragments.front().resultSet) {
478  return fragments.front().getNumTuples();
479  }
480  return numTuples;
481 }
482 
484  if (!fragments.empty() && fragments.front().resultSet) {
485  return fragments.front().resultSet->entryCount();
486  }
487  return numTuples;
488 }
489 
491  if (!fragments.empty() && fragments.front().resultSet) {
492  return fragments.front().resultSet->entryCount();
493  }
494  size_t fragment_num_tupples_upper_bound = 0;
495  for (const auto& fragment : fragments) {
496  fragment_num_tupples_upper_bound =
497  std::max(fragment.getNumTuples(), fragment_num_tupples_upper_bound);
498  }
499  return fragment_num_tupples_upper_bound;
500 }
ChunkMetadataMap synthesize_metadata_table_function(const ResultSet *rows)
#define CHECK_EQ(x, y)
Definition: Logger.h:219
ChunkMetadataMap getChunkMetadataMapPhysicalCopy() const
std::string cat(Ts &&...args)
Fragmenter_Namespace::TableInfo copy_table_info(const Fragmenter_Namespace::TableInfo &table_info)
ChunkMetadataMap synthesize_metadata(const ResultSet *rows)
static Encoder * Create(Data_Namespace::AbstractBuffer *buffer, const SQLTypeInfo sqlType)
Definition: Encoder.cpp:26
Executor * executor_
Definition: InputMetadata.h:48
std::vector< InputDescriptor > input_descs
#define UNREACHABLE()
Definition: Logger.h:255
#define CHECK_GE(x, y)
Definition: Logger.h:224
std::shared_ptr< ResultSet > ResultSetPtr
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:169
std::vector< int > chunkKeyPrefix
Definition: Fragmenter.h:168
const size_t max_inputs_per_thread
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:525
#define LOG_IF(severity, condition)
Definition: Logger.h:301
Fragmenter_Namespace::TableInfo build_table_info(const std::vector< const TableDescriptor * > &shard_tables)
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1354
size_t getPhysicalNumTuples() const
Definition: Fragmenter.h:162
Fragmenter_Namespace::TableInfo getTableInfo(const int table_id)
future< Result > async(Fn &&fn, Args &&...args)
bool uses_int_meta(const SQLTypeInfo &col_ti)
bool is_integer() const
Definition: sqltypes.h:521
#define INJECT_TIMER(DESC)
Definition: measure.h:93
void compute_table_function_col_chunk_stats(std::shared_ptr< ChunkMetadata > &chunk_metadata, const T *col_buffer, const T null_val)
size_t getFragmentNumTuplesUpperBound() const
Fragmenter_Namespace::TableInfo synthesize_table_info(const ResultSetPtr &rows)
const ChunkMetadataMap & getChunkMetadataMap() const
bool is_boolean() const
Definition: sqltypes.h:526
#define CHECK_LT(x, y)
Definition: Logger.h:221
Definition: sqltypes.h:52
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
constexpr float inline_fp_null_value< float >()
InputTableInfoCache(Executor *executor)
constexpr double inline_fp_null_value< double >()
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define CHECK(condition)
Definition: Logger.h:211
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
#define DEBUG_TIMER(name)
Definition: Logger.h:358
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:164
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void collect_table_infos(std::vector< InputTableInfo > &table_infos, const std::vector< InputDescriptor > &input_descs, Executor *executor)
Definition: sqltypes.h:45
bool is_string() const
Definition: sqltypes.h:519
std::unordered_map< int, Fragmenter_Namespace::TableInfo > cache_
Definition: InputMetadata.h:47
size_t get_frag_count_of_table(const int table_id, Executor *executor)
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:522
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114