OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FilterTableFunctionMultiInputTransposeRule.java
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to you under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package com.mapd.calcite.rel.rules;
18 
19 import org.apache.calcite.plan.RelOptCluster;
20 import org.apache.calcite.plan.RelOptRuleCall;
21 import org.apache.calcite.plan.RelOptUtil;
22 import org.apache.calcite.plan.RelRule;
23 import org.apache.calcite.plan.hep.HepRelVertex;
24 import org.apache.calcite.rel.RelNode;
25 import org.apache.calcite.rel.logical.LogicalFilter;
26 import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
27 import org.apache.calcite.rel.metadata.RelColumnMapping;
28 import org.apache.calcite.rel.rules.TransformationRule;
29 import org.apache.calcite.rel.type.RelDataTypeField;
30 import org.apache.calcite.rex.RexBuilder;
31 import org.apache.calcite.rex.RexNode;
32 import org.apache.calcite.rex.RexUtil;
33 import org.apache.calcite.tools.RelBuilder;
34 import org.apache.calcite.tools.RelBuilderFactory;
35 import org.apache.calcite.util.ImmutableBitSet;
36 
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 
53  extends RelRule<FilterTableFunctionMultiInputTransposeRule.Config>
54  implements TransformationRule {
57  super(config);
58  }
59 
60  @Deprecated // to be removed before 2.0
61  public FilterTableFunctionMultiInputTransposeRule(RelBuilderFactory relBuilderFactory) {
62  this(Config.DEFAULT.withRelBuilderFactory(relBuilderFactory).as(Config.class));
63  }
64 
65  //~ Methods ----------------------------------------------------------------
66 
67  @Override
68  public void onMatch(RelOptRuleCall call) {
69  final Boolean debugMode = false;
70  LogicalFilter filter = call.rel(0);
71  LogicalTableFunctionScan funcRel = call.rel(1);
72  Set<RelColumnMapping> columnMappings = funcRel.getColumnMappings();
73  if (columnMappings == null || columnMappings.isEmpty()) {
74  // No column mapping information, so no push-down
75  // possible.
76  return;
77  }
78  /*
79  RelColumnMapping is a triple (out_idx, arg_idx, field_idx) v
80  - out_idx is the index of output
81  - arg_idx is the index of input cursor arguments (excluding all non-cursor
82  arguments)
83  - field_idx is the index of a field of the (arg_idx+1)-th cursor argument
84 
85  Below, out_idx == field_idx is assumed as simplification.
86  */
87  List<RelNode> funcInputs = funcRel.getInputs();
88  final Integer numFuncInputs = funcInputs.size();
89  if (numFuncInputs < 1) {
90  debugPrint("RETURN: funcInputs.size()=" + funcInputs.size(), debugMode);
91  return;
92  }
93 
94  List<HashMap<Integer, Integer>> columnMaps =
95  new ArrayList<HashMap<Integer, Integer>>(numFuncInputs);
96  for (Integer i = 0; i < numFuncInputs; i++) {
97  columnMaps.add(i, new HashMap<Integer, Integer>());
98  }
99 
100  for (RelColumnMapping mapping : columnMappings) {
101  debugPrint("iInputRel.iInputColumn: mapping.iOutputColumn=" + mapping.iInputRel
102  + "." + mapping.iInputColumn + ": " + mapping.iOutputColumn,
103  debugMode);
104  if (mapping.derived) {
105  return;
106  }
107  columnMaps.get(mapping.iInputRel).put(mapping.iOutputColumn, mapping.iInputColumn);
108  }
109 
110  final List<RelNode> newFuncInputs = new ArrayList<>();
111  final RelOptCluster cluster = funcRel.getCluster();
112  final RexNode condition = filter.getCondition();
113  debugPrint("condition=" + condition, debugMode);
114  // create filters on top of each func input, modifying the filter
115  // condition to reference the child instead
116  // origFields is a list of output columns, e.g. [#0: x INTEGER, #1: d INTEGER]
117  List<RelDataTypeField> outputFields = funcRel.getRowType().getFieldList();
118  final Integer numOutputs = outputFields.size();
119 
120  // For a filter to be removed from the top (table function) node, it must be...
121 
122  List<RexNode> outputConjunctivePredicates = RelOptUtil.conjunctions(condition);
123  final Integer numConjunctivePredicates = outputConjunctivePredicates.size();
124  int[] outputColPushdownCount = new int[numOutputs];
125  int[] successfulFilterPushDowns = new int[numConjunctivePredicates];
126  int[] failedFilterPushDowns = new int[numConjunctivePredicates];
127 
128  Integer inputRelIdx = 0;
129  Boolean didPushDown = false;
130 
131  // Iterate through each table function input (for us, expected to be cursors)
132  for (RelNode funcInput : funcInputs) {
133  final List<RelDataTypeField> inputFields = funcInput.getRowType().getFieldList();
134  debugPrint("inputFields=" + inputFields, debugMode);
135  List<RelDataTypeField> validInputFields = new ArrayList<RelDataTypeField>();
136  List<RelDataTypeField> validOutputFields = new ArrayList<RelDataTypeField>();
137  int[] adjustments = new int[numOutputs];
138  List<RexNode> filtersToBePushedDown = new ArrayList<>();
139  Set<Integer> uniquePushedDownOutputIdxs = new HashSet<Integer>();
140  Set<Integer> seenOutputIdxs = new HashSet<Integer>();
141 
142  for (Map.Entry<Integer, Integer> outputInputColMapping :
143  columnMaps.get(inputRelIdx).entrySet()) {
144  final Integer inputColIdx = outputInputColMapping.getValue();
145  final Integer outputColIdx = outputInputColMapping.getKey();
146  validInputFields.add(inputFields.get(inputColIdx));
147  validOutputFields.add(outputFields.get(outputColIdx));
148  adjustments[outputColIdx] = inputColIdx - outputColIdx;
149  }
150  debugPrint("validInputFields: " + validInputFields, debugMode);
151  debugPrint("validOutputFields: " + validOutputFields, debugMode);
152  debugPrint("adjustments=" + Arrays.toString(adjustments), debugMode);
153  Boolean anyFilterRefsPartiallyMapToInputs = false;
154  List<Boolean> subFiltersDidMapToAnyInputs = new ArrayList<Boolean>();
155 
156  for (RexNode conjunctiveFilter : outputConjunctivePredicates) {
157  ImmutableBitSet filterRefs = RelOptUtil.InputFinder.bits(conjunctiveFilter);
158  final List<Integer> filterRefColIdxList = filterRefs.toList();
159  Boolean anyFilterColsPresentInInput = false;
160  Boolean allFilterColsPresentInInput = true;
161  for (Integer filterRefColIdx : filterRefColIdxList) {
162  debugPrint("filterColIdx: " + filterRefColIdx, debugMode);
163  if (!(columnMaps.get(inputRelIdx).containsKey(filterRefColIdx))) {
164  allFilterColsPresentInInput = false;
165  } else {
166  anyFilterColsPresentInInput = true;
167  uniquePushedDownOutputIdxs.add(filterRefColIdx);
168  seenOutputIdxs.add(filterRefColIdx);
169  }
170  }
171  subFiltersDidMapToAnyInputs.add(anyFilterColsPresentInInput);
172  if (anyFilterColsPresentInInput) {
173  if (allFilterColsPresentInInput) {
174  filtersToBePushedDown.add(conjunctiveFilter);
175  } else {
176  // This means that for a single conjunctive predicate, some but not all output
177  // columns mapped to inputs. Ex. x > 5 AND y < 3, where x is a mappable column
178  // but y is not This means that it is semantically unsafe to pushdown any
179  // filters to this input
180  anyFilterRefsPartiallyMapToInputs = true;
181  break;
182  }
183  }
184  }
185  debugPrint("Input idx: " + inputRelIdx
186  + " Any filter refs partially map to inputs: "
187  + anyFilterRefsPartiallyMapToInputs,
188  debugMode);
189  debugPrint(
190  "# Filters to be pushed down: " + filtersToBePushedDown.size(), debugMode);
191  inputRelIdx++;
192  // We need to flag filters that could not be pushed down due to partial mapping,
193  // as these need to remain on top of the table function even if they are
194  // successfully pushed down to other inputs
195  if (anyFilterRefsPartiallyMapToInputs) {
196  for (Integer filterIdx = 0; filterIdx < numConjunctivePredicates; filterIdx++) {
197  if (subFiltersDidMapToAnyInputs.get(filterIdx)) {
198  failedFilterPushDowns[filterIdx]++;
199  }
200  }
201  debugPrint("Failed to push down input: " + inputRelIdx, debugMode);
202  newFuncInputs.add(funcInput);
203  } else {
204  if (filtersToBePushedDown.isEmpty()) {
205  debugPrint("No filters to push down: " + inputRelIdx, debugMode);
206  newFuncInputs.add(funcInput);
207  } else {
208  debugPrint("Func input at pushdown: " + funcInput, debugMode);
209  if (funcInput instanceof HepRelVertex
210  && ((HepRelVertex) funcInput).getCurrentRel()
211  instanceof LogicalFilter) {
212  debugPrint("Filter existed on input node", debugMode);
213  final HepRelVertex inputHepRelVertex = (HepRelVertex) funcInput;
214  final LogicalFilter inputFilter =
215  (LogicalFilter) (inputHepRelVertex.getCurrentRel());
216  final RexNode inputCondition = inputFilter.getCondition();
217  final List<RexNode> inputConjunctivePredicates =
218  RelOptUtil.conjunctions(inputCondition);
219  if (inputConjunctivePredicates.size() > 0) {
220  RexBuilder rexBuilder = filter.getCluster().getRexBuilder();
221  RexNode pushdownCondition = RexUtil.composeConjunction(
222  rexBuilder, filtersToBePushedDown, false);
223  final RexNode newPushdownCondition = pushdownCondition.accept(
224  new RelOptUtil.RexInputConverter(rexBuilder,
225  validOutputFields,
226  validInputFields,
227  adjustments));
228  final List<RexNode> newPushdownConjunctivePredicates =
229  RelOptUtil.conjunctions(newPushdownCondition);
230  final Integer numOriginalPushdownConjunctivePredicates =
231  newPushdownConjunctivePredicates.size();
232  debugPrint("Output predicates: " + newPushdownConjunctivePredicates,
233  debugMode);
234  debugPrint("Input predicates: " + inputConjunctivePredicates, debugMode);
235  newPushdownConjunctivePredicates.removeAll(inputConjunctivePredicates);
236 
237  if (newPushdownConjunctivePredicates.isEmpty()) {
238  debugPrint("All filters existed on input node", debugMode);
239  newFuncInputs.add(funcInput);
240  continue;
241  }
242  if (newPushdownConjunctivePredicates.size()
243  < numOriginalPushdownConjunctivePredicates) {
244  debugPrint("Some predicates eliminated.", debugMode);
245  }
246  }
247  debugPrint("# Filters to be pushed down after prune: "
248  + filtersToBePushedDown.size(),
249  debugMode);
250  } else {
251  debugPrint("No filter detected on input node", debugMode);
252  }
253 
254  RexBuilder rexBuilder = filter.getCluster().getRexBuilder();
255  RexNode pushdownCondition =
256  RexUtil.composeConjunction(rexBuilder, filtersToBePushedDown, false);
257  try {
258  debugPrint("Trying to push down filter", debugMode);
259  final RexNode newCondition =
260  pushdownCondition.accept(new RelOptUtil.RexInputConverter(rexBuilder,
261  validOutputFields,
262  validInputFields,
263  adjustments));
264  didPushDown = true;
265  newFuncInputs.add(LogicalFilter.create(funcInput, newCondition));
266  for (Integer pushedDownOutputIdx : uniquePushedDownOutputIdxs) {
267  outputColPushdownCount[pushedDownOutputIdx]++;
268  }
269  for (Integer filterIdx = 0; filterIdx < numConjunctivePredicates;
270  filterIdx++) {
271  if (subFiltersDidMapToAnyInputs.get(filterIdx)) {
272  successfulFilterPushDowns[filterIdx]++;
273  }
274  }
275  } catch (java.lang.ArrayIndexOutOfBoundsException e) {
276  e.printStackTrace();
277  return;
278  }
279  }
280  }
281  }
282  if (!didPushDown) {
283  debugPrint("Did not push down - returning", debugMode);
284  return;
285  }
286 
287  List<RexNode> remainingFilters = new ArrayList<>();
288  for (Integer filterIdx = 0; filterIdx < numConjunctivePredicates; filterIdx++) {
289  if (successfulFilterPushDowns[filterIdx] == 0
290  || failedFilterPushDowns[filterIdx] > 0) {
291  remainingFilters.add(outputConjunctivePredicates.get(filterIdx));
292  }
293  }
294 
295  debugPrint("Remaining filters: " + remainingFilters, debugMode);
296  // create a new UDX whose children are the filters created above
297  LogicalTableFunctionScan newTableFuncRel = LogicalTableFunctionScan.create(cluster,
298  newFuncInputs,
299  funcRel.getCall(),
300  funcRel.getElementType(),
301  funcRel.getRowType(),
302  columnMappings);
303 
304  final RelBuilder relBuilder = call.builder();
305  relBuilder.push(newTableFuncRel);
306  if (!remainingFilters.isEmpty()) {
307  relBuilder.filter(remainingFilters);
308  }
309  final RelNode outputNode = relBuilder.build();
310  debugPrint(RelOptUtil.toString(outputNode), debugMode);
311  call.transformTo(outputNode);
312  }
313 
314  private void debugPrint(String msg, Boolean debugMode) {
315  if (debugMode) {
316  System.out.println(msg);
317  }
318  }
319 
321  public interface Config extends RelRule.Config {
323  EMPTY.withOperandSupplier(b0
324  -> b0.operand(LogicalFilter.class)
325  .oneInput(b1
326  -> b1.operand(LogicalTableFunctionScan.class)
327  .anyInputs()))
328  .as(Config.class);
329 
330  @Override
333  }
334  }
335 }