OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InputMetadata.cpp File Reference
#include "InputMetadata.h"
#include "Execute.h"
#include "../Fragmenter/Fragmenter.h"
#include <tbb/parallel_for.h>
#include <tbb/task_arena.h>
#include <future>
+ Include dependency graph for InputMetadata.cpp:

Go to the source code of this file.

Namespaces

 

Functions

Fragmenter_Namespace::TableInfo anonymous_namespace{InputMetadata.cpp}::copy_table_info (const Fragmenter_Namespace::TableInfo &table_info)
 
Fragmenter_Namespace::TableInfo build_table_info (const std::vector< const TableDescriptor * > &shard_tables)
 
bool anonymous_namespace{InputMetadata.cpp}::uses_int_meta (const SQLTypeInfo &col_ti)
 
Fragmenter_Namespace::TableInfo anonymous_namespace{InputMetadata.cpp}::synthesize_table_info (const ResultSetPtr &rows)
 
void anonymous_namespace{InputMetadata.cpp}::collect_table_infos (std::vector< InputTableInfo > &table_infos, const std::vector< InputDescriptor > &input_descs, Executor *executor)
 
template<typename T >
void compute_table_function_col_chunk_stats (std::shared_ptr< ChunkMetadata > &chunk_metadata, const T *col_buffer, const T null_val)
 
ChunkMetadataMap synthesize_metadata_table_function (const ResultSet *rows)
 
ChunkMetadataMap synthesize_metadata (const ResultSet *rows)
 
size_t get_frag_count_of_table (const int table_id, Executor *executor)
 
std::vector< InputTableInfoget_table_infos (const std::vector< InputDescriptor > &input_descs, Executor *executor)
 
std::vector< InputTableInfoget_table_infos (const RelAlgExecutionUnit &ra_exe_unit, Executor *executor)
 

Variables

bool g_enable_data_recycler
 
bool g_use_chunk_metadata_cache
 

Function Documentation

Fragmenter_Namespace::TableInfo build_table_info ( const std::vector< const TableDescriptor * > &  shard_tables)

Definition at line 44 of file InputMetadata.cpp.

References CHECK, Fragmenter_Namespace::TableInfo::fragments, and Fragmenter_Namespace::TableInfo::setPhysicalNumTuples().

Referenced by InputTableInfoCache::getTableInfo().

45  {
46  size_t total_number_of_tuples{0};
47  Fragmenter_Namespace::TableInfo table_info_all_shards;
48  for (const TableDescriptor* shard_table : shard_tables) {
49  CHECK(shard_table->fragmenter);
50  const auto& shard_metainfo = shard_table->fragmenter->getFragmentsForQuery();
51  total_number_of_tuples += shard_metainfo.getPhysicalNumTuples();
52  table_info_all_shards.fragments.reserve(table_info_all_shards.fragments.size() +
53  shard_metainfo.fragments.size());
54  table_info_all_shards.fragments.insert(table_info_all_shards.fragments.end(),
55  shard_metainfo.fragments.begin(),
56  shard_metainfo.fragments.end());
57  }
58  table_info_all_shards.setPhysicalNumTuples(total_number_of_tuples);
59  return table_info_all_shards;
60 }
std::vector< FragmentInfo > fragments
Definition: Fragmenter.h:171
#define CHECK(condition)
Definition: Logger.h:223
void setPhysicalNumTuples(const size_t physNumTuples)
Definition: Fragmenter.h:166

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename T >
void compute_table_function_col_chunk_stats ( std::shared_ptr< ChunkMetadata > &  chunk_metadata,
const T *  col_buffer,
const T  null_val 
)

Definition at line 141 of file InputMetadata.cpp.

References max_inputs_per_thread, threading_serial::parallel_for(), and heavydb.dtypes::T.

Referenced by synthesize_metadata_table_function().

144  {
145  const size_t row_count = chunk_metadata->numElements;
146  T min_val{std::numeric_limits<T>::max()};
147  T max_val{std::numeric_limits<T>::lowest()};
148  bool has_nulls{false};
149  constexpr size_t parallel_stats_compute_threshold = 20000UL;
150  if (row_count < parallel_stats_compute_threshold) {
151  for (size_t row_idx = 0; row_idx < row_count; ++row_idx) {
152  const T cell_val = col_buffer[row_idx];
153  if (cell_val == null_val) {
154  has_nulls = true;
155  continue;
156  }
157  if (cell_val < min_val) {
158  min_val = cell_val;
159  }
160  if (cell_val > max_val) {
161  max_val = cell_val;
162  }
163  }
164  } else {
165  const size_t max_thread_count = std::thread::hardware_concurrency();
166  const size_t max_inputs_per_thread = 20000;
167  const size_t min_grain_size = max_inputs_per_thread / 2;
168  const size_t num_threads =
169  std::min(max_thread_count,
170  ((row_count + max_inputs_per_thread - 1) / max_inputs_per_thread));
171 
172  std::vector<T> threads_local_mins(num_threads, std::numeric_limits<T>::max());
173  std::vector<T> threads_local_maxes(num_threads, std::numeric_limits<T>::lowest());
174  std::vector<bool> threads_local_has_nulls(num_threads, false);
175  tbb::task_arena limited_arena(num_threads);
176 
177  limited_arena.execute([&] {
179  tbb::blocked_range<size_t>(0, row_count, min_grain_size),
180  [&](const tbb::blocked_range<size_t>& r) {
181  const size_t start_idx = r.begin();
182  const size_t end_idx = r.end();
183  T local_min_val = std::numeric_limits<T>::max();
184  T local_max_val = std::numeric_limits<T>::lowest();
185  bool local_has_nulls = false;
186  for (size_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
187  const T cell_val = col_buffer[row_idx];
188  if (cell_val == null_val) {
189  local_has_nulls = true;
190  continue;
191  }
192  if (cell_val < local_min_val) {
193  local_min_val = cell_val;
194  }
195  if (cell_val > local_max_val) {
196  local_max_val = cell_val;
197  }
198  }
199  size_t thread_idx = tbb::this_task_arena::current_thread_index();
200  if (local_min_val < threads_local_mins[thread_idx]) {
201  threads_local_mins[thread_idx] = local_min_val;
202  }
203  if (local_max_val > threads_local_maxes[thread_idx]) {
204  threads_local_maxes[thread_idx] = local_max_val;
205  }
206  if (local_has_nulls) {
207  threads_local_has_nulls[thread_idx] = true;
208  }
209  },
210  tbb::simple_partitioner());
211  });
212 
213  for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
214  if (threads_local_mins[thread_idx] < min_val) {
215  min_val = threads_local_mins[thread_idx];
216  }
217  if (threads_local_maxes[thread_idx] > max_val) {
218  max_val = threads_local_maxes[thread_idx];
219  }
220  has_nulls |= threads_local_has_nulls[thread_idx];
221  }
222  }
223  chunk_metadata->fillChunkStats(min_val, max_val, has_nulls);
224 }
const size_t max_inputs_per_thread
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t get_frag_count_of_table ( const int  table_id,
Executor executor 
)

Definition at line 430 of file InputMetadata.cpp.

References CHECK, and CHECK_GE.

Referenced by RelAlgExecutor::getOuterFragmentCount().

430  {
431  const auto temporary_tables = executor->getTemporaryTables();
432  CHECK(temporary_tables);
433  auto it = temporary_tables->find(table_id);
434  if (it != temporary_tables->end()) {
435  CHECK_GE(int(0), table_id);
436  return size_t(1);
437  } else {
438  const auto table_info = executor->getTableInfo(table_id);
439  return table_info.fragments.size();
440  }
441 }
#define CHECK_GE(x, y)
Definition: Logger.h:236
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the caller graph for this function:

std::vector<InputTableInfo> get_table_infos ( const std::vector< InputDescriptor > &  input_descs,
Executor executor 
)

Definition at line 443 of file InputMetadata.cpp.

References anonymous_namespace{InputMetadata.cpp}::collect_table_infos().

Referenced by RelAlgExecutor::computeWindow(), RelAlgExecutor::createAggregateWorkUnit(), RelAlgExecutor::createCompoundWorkUnit(), RelAlgExecutor::createFilterWorkUnit(), RelAlgExecutor::createProjectWorkUnit(), RelAlgExecutor::createTableFunctionWorkUnit(), RelAlgExecutor::createUnionWorkUnit(), RelAlgExecutor::executeDelete(), RelAlgExecutor::executeTableFunction(), RelAlgExecutor::executeUpdate(), RelAlgExecutor::executeWorkUnit(), get_table_infos(), TableOptimizer::getDeletedColumnStats(), RelAlgExecutor::getFilteredCountAll(), RelAlgExecutor::getFilterSelectivity(), RelAlgExecutor::getNDVEstimation(), RelAlgExecutor::handleOutOfMemoryRetry(), TableOptimizer::recomputeColumnMetadata(), and RelAlgExecutor::selectFiltersToBePushedDown().

445  {
446  std::vector<InputTableInfo> table_infos;
447  collect_table_infos(table_infos, input_descs, executor);
448  return table_infos;
449 }
void collect_table_infos(std::vector< InputTableInfo > &table_infos, const std::vector< InputDescriptor > &input_descs, Executor *executor)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector<InputTableInfo> get_table_infos ( const RelAlgExecutionUnit ra_exe_unit,
Executor executor 
)

Definition at line 451 of file InputMetadata.cpp.

References anonymous_namespace{InputMetadata.cpp}::collect_table_infos(), get_table_infos(), INJECT_TIMER, and RelAlgExecutionUnit::input_descs.

452  {
454  std::vector<InputTableInfo> table_infos;
455  collect_table_infos(table_infos, ra_exe_unit.input_descs, executor);
456  return table_infos;
457 }
std::vector< InputDescriptor > input_descs
#define INJECT_TIMER(DESC)
Definition: measure.h:93
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
void collect_table_infos(std::vector< InputTableInfo > &table_infos, const std::vector< InputDescriptor > &input_descs, Executor *executor)

+ Here is the call graph for this function:

ChunkMetadataMap synthesize_metadata ( const ResultSet rows)

Definition at line 306 of file InputMetadata.cpp.

References threading_serial::async(), CHECK, CHECK_LT, cpu_threads(), Encoder::Create(), DEBUG_TIMER, inline_fp_null_val(), inline_int_null_val(), kDOUBLE, kFLOAT, synthesize_metadata_table_function(), TableFunction, result_set::use_parallel_algorithms(), and anonymous_namespace{InputMetadata.cpp}::uses_int_meta().

Referenced by Fragmenter_Namespace::FragmentInfo::getChunkMetadataMap().

306  {
307  auto timer = DEBUG_TIMER(__func__);
308  ChunkMetadataMap metadata_map;
309 
310  if (rows->definitelyHasNoRows()) {
311  // resultset has no valid storage, so we fill dummy metadata and return early
312  std::vector<std::unique_ptr<Encoder>> decoders;
313  for (size_t i = 0; i < rows->colCount(); ++i) {
314  decoders.emplace_back(Encoder::Create(nullptr, rows->getColType(i)));
315  const auto it_ok =
316  metadata_map.emplace(i, decoders.back()->getMetadata(rows->getColType(i)));
317  CHECK(it_ok.second);
318  }
319  return metadata_map;
320  }
321 
322  std::vector<std::vector<std::unique_ptr<Encoder>>> dummy_encoders;
323  const size_t worker_count =
325  for (size_t worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
326  dummy_encoders.emplace_back();
327  for (size_t i = 0; i < rows->colCount(); ++i) {
328  const auto& col_ti = rows->getColType(i);
329  dummy_encoders.back().emplace_back(Encoder::Create(nullptr, col_ti));
330  }
331  }
332 
333  if (rows->getQueryMemDesc().getQueryDescriptionType() ==
336  }
337  rows->moveToBegin();
338  const auto do_work = [rows](const std::vector<TargetValue>& crt_row,
339  std::vector<std::unique_ptr<Encoder>>& dummy_encoders) {
340  for (size_t i = 0; i < rows->colCount(); ++i) {
341  const auto& col_ti = rows->getColType(i);
342  const auto& col_val = crt_row[i];
343  const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
344  CHECK(scalar_col_val);
345  if (uses_int_meta(col_ti)) {
346  const auto i64_p = boost::get<int64_t>(scalar_col_val);
347  CHECK(i64_p);
348  dummy_encoders[i]->updateStats(*i64_p, *i64_p == inline_int_null_val(col_ti));
349  } else if (col_ti.is_fp()) {
350  switch (col_ti.get_type()) {
351  case kFLOAT: {
352  const auto float_p = boost::get<float>(scalar_col_val);
353  CHECK(float_p);
354  dummy_encoders[i]->updateStats(*float_p,
355  *float_p == inline_fp_null_val(col_ti));
356  break;
357  }
358  case kDOUBLE: {
359  const auto double_p = boost::get<double>(scalar_col_val);
360  CHECK(double_p);
361  dummy_encoders[i]->updateStats(*double_p,
362  *double_p == inline_fp_null_val(col_ti));
363  break;
364  }
365  default:
366  CHECK(false);
367  }
368  } else {
369  throw std::runtime_error(col_ti.get_type_name() +
370  " is not supported in temporary table.");
371  }
372  }
373  };
375  const size_t worker_count = cpu_threads();
376  std::vector<std::future<void>> compute_stats_threads;
377  const auto entry_count = rows->entryCount();
378  for (size_t i = 0,
379  start_entry = 0,
380  stride = (entry_count + worker_count - 1) / worker_count;
381  i < worker_count && start_entry < entry_count;
382  ++i, start_entry += stride) {
383  const auto end_entry = std::min(start_entry + stride, entry_count);
384  compute_stats_threads.push_back(std::async(
386  [rows, &do_work, &dummy_encoders](
387  const size_t start, const size_t end, const size_t worker_idx) {
388  for (size_t i = start; i < end; ++i) {
389  const auto crt_row = rows->getRowAtNoTranslations(i);
390  if (!crt_row.empty()) {
391  do_work(crt_row, dummy_encoders[worker_idx]);
392  }
393  }
394  },
395  start_entry,
396  end_entry,
397  i));
398  }
399  for (auto& child : compute_stats_threads) {
400  child.wait();
401  }
402  for (auto& child : compute_stats_threads) {
403  child.get();
404  }
405  } else {
406  while (true) {
407  auto crt_row = rows->getNextRow(false, false);
408  if (crt_row.empty()) {
409  break;
410  }
411  do_work(crt_row, dummy_encoders[0]);
412  }
413  }
414  rows->moveToBegin();
415  for (size_t worker_idx = 1; worker_idx < worker_count; ++worker_idx) {
416  CHECK_LT(worker_idx, dummy_encoders.size());
417  const auto& worker_encoders = dummy_encoders[worker_idx];
418  for (size_t i = 0; i < rows->colCount(); ++i) {
419  dummy_encoders[0][i]->reduceStats(*worker_encoders[i]);
420  }
421  }
422  for (size_t i = 0; i < rows->colCount(); ++i) {
423  const auto it_ok =
424  metadata_map.emplace(i, dummy_encoders[0][i]->getMetadata(rows->getColType(i)));
425  CHECK(it_ok.second);
426  }
427  return metadata_map;
428 }
ChunkMetadataMap synthesize_metadata_table_function(const ResultSet *rows)
static Encoder * Create(Data_Namespace::AbstractBuffer *buffer, const SQLTypeInfo sqlType)
Definition: Encoder.cpp:26
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1468
future< Result > async(Fn &&fn, Args &&...args)
bool uses_int_meta(const SQLTypeInfo &col_ti)
#define CHECK_LT(x, y)
Definition: Logger.h:233
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ChunkMetadataMap synthesize_metadata_table_function ( const ResultSet rows)

Definition at line 226 of file InputMetadata.cpp.

References CHECK, CHECK_EQ, compute_table_function_col_chunk_stats(), inline_fixed_encoding_null_val(), inline_fp_null_value< double >(), inline_fp_null_value< float >(), kBIGINT, kBOOLEAN, kDOUBLE, kENCODING_DICT, kENCODING_NONE, kFLOAT, kINT, kSMALLINT, kTEXT, kTIMESTAMP, kTINYINT, TableFunction, and UNREACHABLE.

Referenced by synthesize_metadata().

226  {
227  CHECK(rows->getQueryMemDesc().getQueryDescriptionType() ==
229  CHECK(rows->didOutputColumnar());
230  CHECK(!(rows->areAnyColumnsLazyFetched()));
231  const size_t col_count = rows->colCount();
232  const auto row_count = rows->entryCount();
233 
234  ChunkMetadataMap chunk_metadata_map;
235 
236  for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
237  const int8_t* columnar_buffer = const_cast<int8_t*>(rows->getColumnarBuffer(col_idx));
238  const auto col_sql_type_info = rows->getColType(col_idx);
239  const auto col_type = col_sql_type_info.get_type();
240  if (col_type != kTEXT) {
241  CHECK(col_sql_type_info.get_compression() == kENCODING_NONE);
242  } else {
243  CHECK(col_sql_type_info.get_compression() == kENCODING_DICT);
244  CHECK_EQ(col_sql_type_info.get_size(), sizeof(int32_t));
245  }
246  std::shared_ptr<ChunkMetadata> chunk_metadata = std::make_shared<ChunkMetadata>();
247  chunk_metadata->sqlType = col_sql_type_info;
248  chunk_metadata->numBytes = row_count * col_sql_type_info.get_size();
249  chunk_metadata->numElements = row_count;
250 
251  switch (col_sql_type_info.get_type()) {
252  case kBOOLEAN:
253  case kTINYINT:
255  chunk_metadata,
256  columnar_buffer,
257  static_cast<int8_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
258  break;
259  case kSMALLINT:
261  chunk_metadata,
262  reinterpret_cast<const int16_t*>(columnar_buffer),
263  static_cast<int16_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
264  break;
265  case kINT:
267  chunk_metadata,
268  reinterpret_cast<const int32_t*>(columnar_buffer),
269  static_cast<int32_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
270  break;
271  case kBIGINT:
272  case kTIMESTAMP:
274  chunk_metadata,
275  reinterpret_cast<const int64_t*>(columnar_buffer),
276  static_cast<int64_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
277  break;
278  case kFLOAT:
279  // For float use the typed null accessor as the generic one converts to double,
280  // and do not want to risk loss of precision
282  chunk_metadata,
283  reinterpret_cast<const float*>(columnar_buffer),
285  break;
286  case kDOUBLE:
288  chunk_metadata,
289  reinterpret_cast<const double*>(columnar_buffer),
291  break;
292  case kTEXT:
294  chunk_metadata,
295  reinterpret_cast<const int32_t*>(columnar_buffer),
296  static_cast<int32_t>(inline_fixed_encoding_null_val(col_sql_type_info)));
297  break;
298  default:
299  UNREACHABLE();
300  }
301  chunk_metadata_map.emplace(col_idx, chunk_metadata);
302  }
303  return chunk_metadata_map;
304 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
#define UNREACHABLE()
Definition: Logger.h:267
std::map< int, std::shared_ptr< ChunkMetadata >> ChunkMetadataMap
void compute_table_function_col_chunk_stats(std::shared_ptr< ChunkMetadata > &chunk_metadata, const T *col_buffer, const T null_val)
Definition: sqltypes.h:52
constexpr float inline_fp_null_value< float >()
constexpr double inline_fp_null_value< double >()
#define CHECK(condition)
Definition: Logger.h:223
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
Definition: sqltypes.h:45

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Variable Documentation

bool g_enable_data_recycler

Definition at line 146 of file Execute.cpp.

bool g_use_chunk_metadata_cache

Definition at line 149 of file Execute.cpp.