OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TableOptimizer Class Reference

Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanup processes that improve performance on a table. Only tables that have been modified using updates or deletes are candidates for cleanup. If the table descriptor corresponds to a sharded table, table optimizer processes each physical shard. More...

#include <TableOptimizer.h>

+ Collaboration diagram for TableOptimizer:

Public Member Functions

 TableOptimizer (const TableDescriptor *td, Executor *executor, const Catalog_Namespace::Catalog &cat)
 
void recomputeMetadata () const
 Recomputes per-chunk metadata for each fragment in the table. Updates and deletes can cause chunk metadata to become wider than the values in the chunk. Recomputing the metadata narrows the range to fit the chunk, as well as setting or unsetting the nulls flag as appropriate. More...
 
void vacuumDeletedRows () const
 Compacts fragments to remove deleted rows. When a row is deleted, a boolean deleted system column is set to true. Vacuuming removes all deleted rows from a fragment. Note that vacuuming is a checkpointing operation, so data on disk will increase even though the number of rows for the current epoch has decreased. More...
 

Private Attributes

const TableDescriptortd_
 
Executorexecutor_
 
const Catalog_Namespace::Catalogcat_
 

Detailed Description

Driver for running cleanup processes on a table. TableOptimizer provides functions for various cleanup processes that improve performance on a table. Only tables that have been modified using updates or deletes are candidates for cleanup. If the table descriptor corresponds to a sharded table, table optimizer processes each physical shard.

Definition at line 31 of file TableOptimizer.h.

Constructor & Destructor Documentation

TableOptimizer::TableOptimizer ( const TableDescriptor td,
Executor executor,
const Catalog_Namespace::Catalog cat 
)

Definition at line 24 of file TableOptimizer.cpp.

References CHECK().

27  : td_(td), executor_(executor), cat_(cat) {
28  CHECK(td);
29 }
const TableDescriptor * td_
Executor * executor_
CHECK(cgen_state)
const Catalog_Namespace::Catalog & cat_

+ Here is the call graph for this function:

Member Function Documentation

void TableOptimizer::recomputeMetadata ( ) const

Recomputes per-chunk metadata for each fragment in the table. Updates and deletes can cause chunk metadata to become wider than the values in the chunk. Recomputing the metadata narrows the range to fit the chunk, as well as setting or unsetting the nulls flag as appropriate.

Definition at line 115 of file TableOptimizer.cpp.

References anonymous_namespace{TableOptimizer.cpp}::build_ra_exe_unit(), cat_, CHECK(), CHECK_EQ, CHECK_GE, ColumnDescriptor::columnId, CPU, Data_Namespace::CPU_LEVEL, Catalog_Namespace::DBMetadata::dbId, executor_, anonymous_namespace{TableOptimizer.cpp}::get_compilation_options(), anonymous_namespace{TableOptimizer.cpp}::get_execution_options(), get_logical_type_info(), get_table_infos(), Catalog_Namespace::Catalog::getAllColumnMetadataForTable(), Catalog_Namespace::Catalog::getCurrentDB(), Catalog_Namespace::Catalog::getDataMgr(), Catalog_Namespace::Catalog::getDeletedColumn(), Catalog_Namespace::Catalog::getPhysicalTablesDescriptors(), Data_Namespace::GPU_LEVEL, logger::INFO, INJECT_TIMER, kCOUNT, kINT, kMAX, kMIN, LOG, TableDescriptor::nShards, anonymous_namespace{TableOptimizer.cpp}::set_metadata_from_results(), TableDescriptor::tableId, TableDescriptor::tableName, td_, and logger::WARNING.

Referenced by migrations::MigrationMgr::migrateDateInDaysMetadata().

115  {
116  INJECT_TIMER(optimizeMetadata);
117  mapd_unique_lock<mapd_shared_mutex> lock(executor_->execute_mutex_);
118 
119  LOG(INFO) << "Recomputing metadata for " << td_->tableName;
120 
121  CHECK_GE(td_->tableId, 0);
122 
123  std::vector<const TableDescriptor*> table_descriptors;
124  if (td_->nShards > 0) {
125  const auto physical_tds = cat_.getPhysicalTablesDescriptors(td_);
126  table_descriptors.insert(
127  table_descriptors.begin(), physical_tds.begin(), physical_tds.end());
128  } else {
129  table_descriptors.push_back(td_);
130  }
131 
132  auto& data_mgr = cat_.getDataMgr();
133 
134  for (const auto td : table_descriptors) {
135  ScopeGuard row_set_holder = [this] { executor_->row_set_mem_owner_ = nullptr; };
136  // We can use a smaller block size here, since we won't be running projection queries
137  executor_->row_set_mem_owner_ = std::make_shared<RowSetMemoryOwner>(1000000000);
138  executor_->catalog_ = &cat_;
139  const auto table_id = td->tableId;
140 
141  std::unordered_map</*fragment_id*/ int, size_t> tuple_count_map;
142 
143  // Special case handle $deleted column if it exists
144  // whilst handling the delete column also capture
145  // the number of non deleted rows per fragment
146  if (td->hasDeletedCol) {
147  auto cd = cat_.getDeletedColumn(td);
148  const auto column_id = cd->columnId;
149 
150  const auto input_col_desc =
151  std::make_shared<const InputColDescriptor>(column_id, table_id, 0);
152  const auto col_expr =
153  makeExpr<Analyzer::ColumnVar>(cd->columnType, table_id, column_id, 0);
154  const auto count_expr =
155  makeExpr<Analyzer::AggExpr>(cd->columnType, kCOUNT, col_expr, false, nullptr);
156 
157  const auto ra_exe_unit = build_ra_exe_unit(input_col_desc, {count_expr.get()});
158  const auto table_infos = get_table_infos(ra_exe_unit, executor_);
159  CHECK_EQ(table_infos.size(), size_t(1));
160 
162  const auto eo = get_execution_options();
163 
164  std::unordered_map</*fragment_id*/ int, ChunkStats> stats_map;
165 
166  size_t total_num_tuples = 0;
167  Executor::PerFragmentCallBack compute_deleted_callback =
168  [&stats_map, &tuple_count_map, &total_num_tuples, cd](
169  ResultSetPtr results,
170  const Fragmenter_Namespace::FragmentInfo& fragment_info) {
171  // count number of tuples in $deleted as total number of tuples in table.
172  if (cd->isDeletedCol) {
173  total_num_tuples += fragment_info.getPhysicalNumTuples();
174  }
175  if (fragment_info.getPhysicalNumTuples() == 0) {
176  // TODO(adb): Should not happen, but just to be safe...
177  LOG(WARNING) << "Skipping completely empty fragment for column "
178  << cd->columnName;
179  return;
180  }
181 
182  const auto row = results->getNextRow(false, false);
183  CHECK_EQ(row.size(), size_t(1));
184 
185  const auto& ti = cd->columnType;
186 
187  auto chunk_metadata = std::make_shared<ChunkMetadata>();
188  chunk_metadata->sqlType = get_logical_type_info(ti);
189 
190  const auto count_val = read_scalar_target_value<int64_t>(row[0]);
191  if (count_val == 0) {
192  // Assume chunk of all nulls, bail
193  return;
194  }
195 
196  // min element 0 max element 1
197  std::vector<TargetValue> fakerow;
198 
199  auto num_tuples = static_cast<size_t>(count_val);
200 
201  // calculate min
202  if (num_tuples == fragment_info.getPhysicalNumTuples()) {
203  // nothing deleted
204  // min = false;
205  // max = false;
206  fakerow.emplace_back(TargetValue{int64_t(0)});
207  fakerow.emplace_back(TargetValue{int64_t(0)});
208  } else {
209  if (num_tuples == 0) {
210  // everything marked as delete
211  // min = true
212  // max = true
213  fakerow.emplace_back(TargetValue{int64_t(1)});
214  fakerow.emplace_back(TargetValue{int64_t(1)});
215  } else {
216  // some deleted
217  // min = false
218  // max = true;
219  fakerow.emplace_back(TargetValue{int64_t(0)});
220  fakerow.emplace_back(TargetValue{int64_t(1)});
221  }
222  }
223 
224  // place manufacture min and max in fake row to use common infra
225  if (!set_metadata_from_results(*chunk_metadata, fakerow, ti, false)) {
226  LOG(WARNING) << "Unable to process new metadata values for column "
227  << cd->columnName;
228  return;
229  }
230 
231  stats_map.emplace(
232  std::make_pair(fragment_info.fragmentId, chunk_metadata->chunkStats));
233  tuple_count_map.emplace(std::make_pair(fragment_info.fragmentId, num_tuples));
234  };
235 
236  executor_->executeWorkUnitPerFragment(
237  ra_exe_unit, table_infos[0], co, eo, cat_, compute_deleted_callback);
238 
239  auto* fragmenter = td->fragmenter.get();
240  CHECK(fragmenter);
241  fragmenter->updateChunkStats(cd, stats_map);
242  fragmenter->setNumRows(total_num_tuples);
243  } // finished special handling deleted column;
244 
245  // TODO(adb): Support geo
246  auto col_descs = cat_.getAllColumnMetadataForTable(table_id, false, false, false);
247  for (const auto& cd : col_descs) {
248  const auto ti = cd->columnType;
249  const auto column_id = cd->columnId;
250 
251  if (ti.is_varlen()) {
252  LOG(INFO) << "Skipping varlen column " << cd->columnName;
253  continue;
254  }
255 
256  const auto input_col_desc =
257  std::make_shared<const InputColDescriptor>(column_id, table_id, 0);
258  const auto col_expr =
259  makeExpr<Analyzer::ColumnVar>(cd->columnType, table_id, column_id, 0);
260  auto max_expr =
261  makeExpr<Analyzer::AggExpr>(cd->columnType, kMAX, col_expr, false, nullptr);
262  auto min_expr =
263  makeExpr<Analyzer::AggExpr>(cd->columnType, kMIN, col_expr, false, nullptr);
264  auto count_expr =
265  makeExpr<Analyzer::AggExpr>(cd->columnType, kCOUNT, col_expr, false, nullptr);
266 
267  if (ti.is_string()) {
268  const SQLTypeInfo fun_ti(kINT);
269  const auto fun_expr = makeExpr<Analyzer::KeyForStringExpr>(col_expr);
270  max_expr = makeExpr<Analyzer::AggExpr>(fun_ti, kMAX, fun_expr, false, nullptr);
271  min_expr = makeExpr<Analyzer::AggExpr>(fun_ti, kMIN, fun_expr, false, nullptr);
272  }
273  const auto ra_exe_unit = build_ra_exe_unit(
274  input_col_desc, {min_expr.get(), max_expr.get(), count_expr.get()});
275  const auto table_infos = get_table_infos(ra_exe_unit, executor_);
276  CHECK_EQ(table_infos.size(), size_t(1));
277 
279  const auto eo = get_execution_options();
280 
281  std::unordered_map</*fragment_id*/ int, ChunkStats> stats_map;
282 
283  Executor::PerFragmentCallBack compute_metadata_callback =
284  [&stats_map, &tuple_count_map, cd](
285  ResultSetPtr results,
286  const Fragmenter_Namespace::FragmentInfo& fragment_info) {
287  if (fragment_info.getPhysicalNumTuples() == 0) {
288  // TODO(adb): Should not happen, but just to be safe...
289  LOG(WARNING) << "Skipping completely empty fragment for column "
290  << cd->columnName;
291  return;
292  }
293 
294  const auto row = results->getNextRow(false, false);
295  CHECK_EQ(row.size(), size_t(3));
296 
297  const auto& ti = cd->columnType;
298 
299  auto chunk_metadata = std::make_shared<ChunkMetadata>();
300  chunk_metadata->sqlType = get_logical_type_info(ti);
301 
302  const auto count_val = read_scalar_target_value<int64_t>(row[2]);
303  if (count_val == 0) {
304  // Assume chunk of all nulls, bail
305  return;
306  }
307 
308  bool has_nulls = true; // default to wide
309  auto tuple_count_itr = tuple_count_map.find(fragment_info.fragmentId);
310  if (tuple_count_itr != tuple_count_map.end()) {
311  has_nulls = !(static_cast<size_t>(count_val) == tuple_count_itr->second);
312  } else {
313  // no deleted column calc so use raw physical count
314  has_nulls = !(static_cast<size_t>(count_val) ==
315  fragment_info.getPhysicalNumTuples());
316  }
317 
318  if (!set_metadata_from_results(*chunk_metadata, row, ti, has_nulls)) {
319  LOG(WARNING) << "Unable to process new metadata values for column "
320  << cd->columnName;
321  return;
322  }
323 
324  stats_map.emplace(
325  std::make_pair(fragment_info.fragmentId, chunk_metadata->chunkStats));
326  };
327 
328  executor_->executeWorkUnitPerFragment(
329  ra_exe_unit, table_infos[0], co, eo, cat_, compute_metadata_callback);
330 
331  auto* fragmenter = td->fragmenter.get();
332  CHECK(fragmenter);
333  fragmenter->updateChunkStats(cd, stats_map);
334  }
335  data_mgr.checkpoint(cat_.getCurrentDB().dbId, table_id);
336  executor_->clearMetaInfoCache();
337  }
338 
339  data_mgr.clearMemory(Data_Namespace::MemoryLevel::CPU_LEVEL);
340  if (data_mgr.gpusPresent()) {
341  data_mgr.clearMemory(Data_Namespace::MemoryLevel::GPU_LEVEL);
342  }
343 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::string tableName
const ColumnDescriptor * getDeletedColumn(const TableDescriptor *td) const
Definition: Catalog.cpp:2742
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:195
#define LOG(tag)
Definition: Logger.h:188
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:3494
CompilationOptions get_compilation_options(const ExecutorDeviceType &device_type)
#define CHECK_GE(x, y)
Definition: Logger.h:210
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Definition: sqltypes.h:818
std::shared_ptr< ResultSet > ResultSetPtr
const TableDescriptor * td_
Definition: sqldefs.h:73
Executor * executor_
CHECK(cgen_state)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:194
#define INJECT_TIMER(DESC)
Definition: measure.h:91
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:78
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1704
Definition: sqldefs.h:76
bool set_metadata_from_results(ChunkMetadata &chunk_metadata, const std::vector< TargetValue > &row, const SQLTypeInfo &ti, const bool has_nulls)
std::function< void(ResultSetPtr, const Fragmenter_Namespace::FragmentInfo &)> PerFragmentCallBack
Definition: Execute.h:454
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
Definition: sqltypes.h:46
Definition: sqldefs.h:74
RelAlgExecutionUnit build_ra_exe_unit(const std::shared_ptr< const InputColDescriptor > input_col_desc, const std::vector< Analyzer::Expr * > &target_exprs)
const Catalog_Namespace::Catalog & cat_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void TableOptimizer::vacuumDeletedRows ( ) const

Compacts fragments to remove deleted rows. When a row is deleted, a boolean deleted system column is set to true. Vacuuming removes all deleted rows from a fragment. Note that vacuuming is a checkpointing operation, so data on disk will increase even though the number of rows for the current epoch has decreased.

Definition at line 345 of file TableOptimizer.cpp.

References cat_, Catalog_Namespace::Catalog::checkpoint(), TableDescriptor::tableId, td_, and Catalog_Namespace::Catalog::vacuumDeletedRows().

Referenced by DBHandler::sql_execute_impl().

345  {
346  const auto table_id = td_->tableId;
347  cat_.vacuumDeletedRows(table_id);
348  cat_.checkpoint(table_id);
349 }
const TableDescriptor * td_
void checkpoint(const int logicalTableId) const
Definition: Catalog.cpp:3598
void vacuumDeletedRows(const TableDescriptor *td) const
Definition: Catalog.cpp:3706
const Catalog_Namespace::Catalog & cat_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

const Catalog_Namespace::Catalog& TableOptimizer::cat_
private

Definition at line 57 of file TableOptimizer.h.

Referenced by recomputeMetadata(), and vacuumDeletedRows().

Executor* TableOptimizer::executor_
private

Definition at line 56 of file TableOptimizer.h.

Referenced by recomputeMetadata().

const TableDescriptor* TableOptimizer::td_
private

Definition at line 55 of file TableOptimizer.h.

Referenced by recomputeMetadata(), and vacuumDeletedRows().


The documentation for this class was generated from the following files: