OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TableOptimizer Class Reference

Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanup processes that improve performance on a table. Only tables that have been modified using updates or deletes are candidates for cleanup. If the table descriptor corresponds to a sharded table, table optimizer processes each physical shard. More...

#include <TableOptimizer.h>

+ Collaboration diagram for TableOptimizer:

Public Member Functions

 TableOptimizer (const TableDescriptor *td, Executor *executor, const Catalog_Namespace::Catalog &cat)
 
void recomputeMetadata () const
 Recomputes per-chunk metadata for each fragment in the table. Updates and deletes can cause chunk metadata to become wider than the values in the chunk. Recomputing the metadata narrows the range to fit the chunk, as well as setting or unsetting the nulls flag as appropriate. More...
 
void vacuumDeletedRows () const
 Compacts fragments to remove deleted rows. When a row is deleted, a boolean deleted system column is set to true. Vacuuming removes all deleted rows from a fragment. Note that vacuuming is a checkpointing operation, so data on disk will increase even though the number of rows for the current epoch has decreased. More...
 

Private Attributes

const TableDescriptortd_
 
Executorexecutor_
 
const Catalog_Namespace::Catalogcat_
 

Detailed Description

Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanup processes that improve performance on a table. Only tables that have been modified using updates or deletes are candidates for cleanup. If the table descriptor corresponds to a sharded table, table optimizer processes each physical shard.

Definition at line 31 of file TableOptimizer.h.

Constructor & Destructor Documentation

TableOptimizer::TableOptimizer ( const TableDescriptor td,
Executor executor,
const Catalog_Namespace::Catalog cat 
)

Definition at line 24 of file TableOptimizer.cpp.

References CHECK().

27  : td_(td), executor_(executor), cat_(cat) {
28  CHECK(td);
29 }
const TableDescriptor * td_
Executor * executor_
CHECK(cgen_state)
const Catalog_Namespace::Catalog & cat_

+ Here is the call graph for this function:

Member Function Documentation

void TableOptimizer::recomputeMetadata ( ) const

Recomputes per-chunk metadata for each fragment in the table. Updates and deletes can cause chunk metadata to become wider than the values in the chunk. Recomputing the metadata narrows the range to fit the chunk, as well as setting or unsetting the nulls flag as appropriate.

Definition at line 115 of file TableOptimizer.cpp.

References anonymous_namespace{TableOptimizer.cpp}::build_ra_exe_unit(), cat_, CHECK(), CHECK_EQ, CHECK_GE, ChunkMetadata::chunkStats, ColumnDescriptor::columnId, CPU, Data_Namespace::CPU_LEVEL, Catalog_Namespace::DBMetadata::dbId, executor_, anonymous_namespace{TableOptimizer.cpp}::get_compilation_options(), anonymous_namespace{TableOptimizer.cpp}::get_execution_options(), get_logical_type_info(), get_table_infos(), Catalog_Namespace::Catalog::getAllColumnMetadataForTable(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getDeletedColumn(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), Data_Namespace::GPU_LEVEL, logger::INFO, INJECT_TIMER, kCOUNT, kINT, kMAX, kMIN, LOG, TableDescriptor::nShards, anonymous_namespace{TableOptimizer.cpp}::set_metadata_from_results(), ChunkMetadata::sqlType, TableDescriptor::tableId, TableDescriptor::tableName, td_, and logger::WARNING.

Referenced by Catalog_Namespace::Catalog::checkDateInDaysColumnMigration().

115  {
116  INJECT_TIMER(optimizeMetadata);
117  std::lock_guard<std::mutex> lock(executor_->execute_mutex_);
118 
119  LOG(INFO) << "Recomputing metadata for " << td_->tableName;
120 
121  CHECK_GE(td_->tableId, 0);
122 
123  std::vector<const TableDescriptor*> table_descriptors;
124  if (td_->nShards > 0) {
125  const auto physical_tds = cat_.getPhysicalTablesDescriptors(td_);
126  table_descriptors.insert(
127  table_descriptors.begin(), physical_tds.begin(), physical_tds.end());
128  } else {
129  table_descriptors.push_back(td_);
130  }
131 
132  auto& data_mgr = cat_.getDataMgr();
133 
134  for (const auto td : table_descriptors) {
135  ScopeGuard row_set_holder = [this] { executor_->row_set_mem_owner_ = nullptr; };
136  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>();
137  executor_->catalog_ = &cat_;
138  const auto table_id = td->tableId;
139 
140  std::unordered_map</*fragment_id*/ int, size_t> tuple_count_map;
141 
142  // Special case handle $deleted column if it exists
143  // whilst handling the delete column also capture
144  // the number of non deleted rows per fragment
145  if (td->hasDeletedCol) {
146  auto cd = cat_.getDeletedColumn(td);
147  const auto column_id = cd->columnId;
148 
149  const auto input_col_desc =
150  std::make_shared<const InputColDescriptor>(column_id, table_id, 0);
151  const auto col_expr =
152  makeExpr<Analyzer::ColumnVar>(cd->columnType, table_id, column_id, 0);
153  const auto count_expr =
154  makeExpr<Analyzer::AggExpr>(cd->columnType, kCOUNT, col_expr, false, nullptr);
155 
156  const auto ra_exe_unit = build_ra_exe_unit(input_col_desc, {count_expr.get()});
157  const auto table_infos = get_table_infos(ra_exe_unit, executor_);
158  CHECK_EQ(table_infos.size(), size_t(1));
159 
161  const auto eo = get_execution_options();
162 
163  std::unordered_map</*fragment_id*/ int, ChunkStats> stats_map;
164 
165  size_t total_num_tuples = 0;
166  Executor::PerFragmentCallBack compute_deleted_callback =
167  [&stats_map, &tuple_count_map, &total_num_tuples, cd](
168  ResultSetPtr results,
169  const Fragmenter_Namespace::FragmentInfo& fragment_info) {
170  // count number of tuples in $deleted as total number of tuples in table.
171  if (cd->isDeletedCol) {
172  total_num_tuples += fragment_info.getPhysicalNumTuples();
173  }
174  if (fragment_info.getPhysicalNumTuples() == 0) {
175  // TODO(adb): Should not happen, but just to be safe...
176  LOG(WARNING) << "Skipping completely empty fragment for column "
177  << cd->columnName;
178  return;
179  }
180 
181  const auto row = results->getNextRow(false, false);
182  CHECK_EQ(row.size(), size_t(1));
183 
184  const auto& ti = cd->columnType;
185 
186  ChunkMetadata chunk_metadata;
187  chunk_metadata.sqlType = get_logical_type_info(ti);
188 
189  const auto count_val = read_scalar_target_value<int64_t>(row[0]);
190  if (count_val == 0) {
191  // Assume chunk of all nulls, bail
192  return;
193  }
194 
195  // min element 0 max element 1
196  std::vector<TargetValue> fakerow;
197 
198  auto num_tuples = static_cast<size_t>(count_val);
199 
200  // calculate min
201  if (num_tuples == fragment_info.getPhysicalNumTuples()) {
202  // nothing deleted
203  // min = false;
204  // max = false;
205  fakerow.emplace_back(TargetValue{int64_t(0)});
206  fakerow.emplace_back(TargetValue{int64_t(0)});
207  } else {
208  if (num_tuples == 0) {
209  // everything marked as delete
210  // min = true
211  // max = true
212  fakerow.emplace_back(TargetValue{int64_t(1)});
213  fakerow.emplace_back(TargetValue{int64_t(1)});
214  } else {
215  // some deleted
216  // min = false
217  // max = true;
218  fakerow.emplace_back(TargetValue{int64_t(0)});
219  fakerow.emplace_back(TargetValue{int64_t(1)});
220  }
221  }
222 
223  // place manufacture min and max in fake row to use common infra
224  if (!set_metadata_from_results(chunk_metadata, fakerow, ti, false)) {
225  LOG(WARNING) << "Unable to process new metadata values for column "
226  << cd->columnName;
227  return;
228  }
229 
230  stats_map.emplace(
231  std::make_pair(fragment_info.fragmentId, chunk_metadata.chunkStats));
232  tuple_count_map.emplace(std::make_pair(fragment_info.fragmentId, num_tuples));
233  };
234 
235  executor_->executeWorkUnitPerFragment(
236  ra_exe_unit, table_infos[0], co, eo, cat_, compute_deleted_callback);
237 
238  auto* fragmenter = td->fragmenter;
239  CHECK(fragmenter);
240  fragmenter->updateChunkStats(cd, stats_map);
241  fragmenter->setNumRows(total_num_tuples);
242  } // finished special handling deleted column;
243 
244  // TODO(adb): Support geo
245  auto col_descs = cat_.getAllColumnMetadataForTable(table_id, false, false, false);
246  for (const auto& cd : col_descs) {
247  const auto ti = cd->columnType;
248  const auto column_id = cd->columnId;
249 
250  if (ti.is_varlen()) {
251  LOG(INFO) << "Skipping varlen column " << cd->columnName;
252  continue;
253  }
254 
255  const auto input_col_desc =
256  std::make_shared<const InputColDescriptor>(column_id, table_id, 0);
257  const auto col_expr =
258  makeExpr<Analyzer::ColumnVar>(cd->columnType, table_id, column_id, 0);
259  auto max_expr =
260  makeExpr<Analyzer::AggExpr>(cd->columnType, kMAX, col_expr, false, nullptr);
261  auto min_expr =
262  makeExpr<Analyzer::AggExpr>(cd->columnType, kMIN, col_expr, false, nullptr);
263  auto count_expr =
264  makeExpr<Analyzer::AggExpr>(cd->columnType, kCOUNT, col_expr, false, nullptr);
265 
266  if (ti.is_string()) {
267  const SQLTypeInfo fun_ti(kINT);
268  const auto fun_expr = makeExpr<Analyzer::KeyForStringExpr>(col_expr);
269  max_expr = makeExpr<Analyzer::AggExpr>(fun_ti, kMAX, fun_expr, false, nullptr);
270  min_expr = makeExpr<Analyzer::AggExpr>(fun_ti, kMIN, fun_expr, false, nullptr);
271  }
272  const auto ra_exe_unit = build_ra_exe_unit(
273  input_col_desc, {min_expr.get(), max_expr.get(), count_expr.get()});
274  const auto table_infos = get_table_infos(ra_exe_unit, executor_);
275  CHECK_EQ(table_infos.size(), size_t(1));
276 
278  const auto eo = get_execution_options();
279 
280  std::unordered_map</*fragment_id*/ int, ChunkStats> stats_map;
281 
282  Executor::PerFragmentCallBack compute_metadata_callback =
283  [&stats_map, &tuple_count_map, cd](
284  ResultSetPtr results,
285  const Fragmenter_Namespace::FragmentInfo& fragment_info) {
286  if (fragment_info.getPhysicalNumTuples() == 0) {
287  // TODO(adb): Should not happen, but just to be safe...
288  LOG(WARNING) << "Skipping completely empty fragment for column "
289  << cd->columnName;
290  return;
291  }
292 
293  const auto row = results->getNextRow(false, false);
294  CHECK_EQ(row.size(), size_t(3));
295 
296  const auto& ti = cd->columnType;
297 
298  ChunkMetadata chunk_metadata;
299  chunk_metadata.sqlType = get_logical_type_info(ti);
300 
301  const auto count_val = read_scalar_target_value<int64_t>(row[2]);
302  if (count_val == 0) {
303  // Assume chunk of all nulls, bail
304  return;
305  }
306 
307  bool has_nulls = true; // default to wide
308  auto tuple_count_itr = tuple_count_map.find(fragment_info.fragmentId);
309  if (tuple_count_itr != tuple_count_map.end()) {
310  has_nulls = !(static_cast<size_t>(count_val) == tuple_count_itr->second);
311  } else {
312  // no deleted column calc so use raw physical count
313  has_nulls = !(static_cast<size_t>(count_val) ==
314  fragment_info.getPhysicalNumTuples());
315  }
316 
317  if (!set_metadata_from_results(chunk_metadata, row, ti, has_nulls)) {
318  LOG(WARNING) << "Unable to process new metadata values for column "
319  << cd->columnName;
320  return;
321  }
322 
323  stats_map.emplace(
324  std::make_pair(fragment_info.fragmentId, chunk_metadata.chunkStats));
325  };
326 
327  executor_->executeWorkUnitPerFragment(
328  ra_exe_unit, table_infos[0], co, eo, cat_, compute_metadata_callback);
329 
330  auto* fragmenter = td->fragmenter;
331  CHECK(fragmenter);
332  fragmenter->updateChunkStats(cd, stats_map);
333  }
334  data_mgr.checkpoint(cat_.getCurrentDB().dbId, table_id);
335  executor_->clearMetaInfoCache();
336  }
337 
338  data_mgr.clearMemory(Data_Namespace::MemoryLevel::CPU_LEVEL);
339  if (data_mgr.gpusPresent()) {
340  data_mgr.clearMemory(Data_Namespace::MemoryLevel::GPU_LEVEL);
341  }
342 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::string tableName
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:2179
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:177
#define LOG(tag)
Definition: Logger.h:185
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:2897
CompilationOptions get_compilation_options(const ExecutorDeviceType &device_type)
#define CHECK_GE(x, y)
Definition: Logger.h:203
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:869
std::shared_ptr< ResultSet > ResultSetPtr
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
const TableDescriptor * td_
Definition: sqldefs.h:71
Executor * executor_
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:176
#define INJECT_TIMER(DESC)
Definition: measure.h:91
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:79
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1581
Definition: sqldefs.h:71
bool set_metadata_from_results(ChunkMetadata &chunk_metadata, const std::vector< TargetValue > &row, const SQLTypeInfo &ti, const bool has_nulls)
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:626
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
Definition: sqltypes.h:48
Definition: sqldefs.h:71
RelAlgExecutionUnit build_ra_exe_unit(const std::shared_ptr< const InputColDescriptor > input_col_desc, const std::vector< Analyzer::Expr * > &target_exprs)
SQLTypeInfo sqlType
Definition: ChunkMetadata.h:32
const Catalog_Namespace::Catalog & cat_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void TableOptimizer::vacuumDeletedRows ( ) const

Compacts fragments to remove deleted rows. When a row is deleted, a boolean deleted system column is set to true. Vacuuming removes all deleted rows from a fragment. Note that vacuuming is a checkpointing operation, so data on disk will increase even though the number of rows for the current epoch has decreased.

Definition at line 344 of file TableOptimizer.cpp.

References cat_, Catalog_Namespace::Catalog::checkpoint(), TableDescriptor::tableId, td_, and Catalog_Namespace::Catalog::vacuumDeletedRows().

Referenced by MapDHandler::sql_execute_impl().

344  {
345  const auto table_id = td_->tableId;
346  cat_.vacuumDeletedRows(table_id);
347  cat_.checkpoint(table_id);
348 }
const TableDescriptor * td_
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:2929
void vacuumDeletedRows(const TableDescriptor *td) const
Definition: Catalog.cpp:3030
const Catalog_Namespace::Catalog & cat_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

const Catalog_Namespace::Catalog& TableOptimizer::cat_
private

Definition at line 57 of file TableOptimizer.h.

Referenced by recomputeMetadata(), and vacuumDeletedRows().

Executor* TableOptimizer::executor_
private

Definition at line 56 of file TableOptimizer.h.

Referenced by recomputeMetadata().

const TableDescriptor* TableOptimizer::td_
private

Definition at line 55 of file TableOptimizer.h.

Referenced by recomputeMetadata(), and vacuumDeletedRows().


The documentation for this class was generated from the following files: