flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [10/12] flink git commit: [FLINK-5435] [table] Remove FlinkAggregateJoinTransposeRule and FlinkRelDecorrelator after bumping Calcite to v1.12.
Date Thu, 06 Apr 2017 19:29:17 GMT
[FLINK-5435] [table] Remove FlinkAggregateJoinTransposeRule and FlinkRelDecorrelator after
bumping Calcite to v1.12.

This closes #3689.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5173fa2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5173fa2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5173fa2

Branch: refs/heads/table-retraction
Commit: c5173fa26d3d8a32b0b182a37d34a8eeff6e36d0
Parents: 07f1b03
Author: Kurt Young <ykt836@gmail.com>
Authored: Thu Apr 6 22:06:51 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Apr 6 16:34:02 2017 +0200

----------------------------------------------------------------------
 .../rules/FlinkAggregateJoinTransposeRule.java  |  358 ---
 .../calcite/sql2rel/FlinkRelDecorrelator.java   | 2216 ------------------
 .../flink/table/calcite/FlinkPlannerImpl.scala  |    8 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |    4 +-
 .../batch/sql/QueryDecorrelationTest.scala      |  123 +-
 .../api/scala/batch/sql/SetOperatorsTest.scala  |   32 +-
 6 files changed, 36 insertions(+), 2705 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c5173fa2/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateJoinTransposeRule.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateJoinTransposeRule.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateJoinTransposeRule.java
deleted file mode 100644
index a817c91..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateJoinTransposeRule.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.calcite.rules;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlSplittableAggFunction;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.Bug;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Util;
-import org.apache.calcite.util.mapping.Mapping;
-import org.apache.calcite.util.mapping.Mappings;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Copied from {@link org.apache.calcite.rel.rules.AggregateJoinTransposeRule}, should be
- * removed once <a href="https://issues.apache.org/jira/browse/CALCITE-1544">[CALCITE-1544]
fixes.
- */
-public class FlinkAggregateJoinTransposeRule extends RelOptRule {
-	public static final FlinkAggregateJoinTransposeRule INSTANCE = new FlinkAggregateJoinTransposeRule(LogicalAggregate.class,
LogicalJoin.class, RelFactories.LOGICAL_BUILDER, false);
-
-	/**
-	 * Extended instance of the rule that can push down aggregate functions.
-	 */
-	public static final FlinkAggregateJoinTransposeRule EXTENDED = new FlinkAggregateJoinTransposeRule(LogicalAggregate.class,
LogicalJoin.class, RelFactories.LOGICAL_BUILDER, true);
-
-	private final boolean allowFunctions;
-
-	/**
-	 * Creates an FlinkAggregateJoinTransposeRule.
-	 */
-	public FlinkAggregateJoinTransposeRule(Class<? extends Aggregate> aggregateClass,
Class<? extends Join> joinClass, RelBuilderFactory relBuilderFactory, boolean allowFunctions)
{
-		super(operand(aggregateClass, null, Aggregate.IS_SIMPLE, operand(joinClass, any())), relBuilderFactory,
null);
-		this.allowFunctions = allowFunctions;
-	}
-
-	/**
-	 * @deprecated to be removed before 2.0
-	 */
-	@Deprecated
-	public FlinkAggregateJoinTransposeRule(Class<? extends Aggregate> aggregateClass,
RelFactories.AggregateFactory aggregateFactory, Class<? extends Join> joinClass, RelFactories.JoinFactory
joinFactory) {
-		this(aggregateClass, joinClass, RelBuilder.proto(aggregateFactory, joinFactory), false);
-	}
-
-	/**
-	 * @deprecated to be removed before 2.0
-	 */
-	@Deprecated
-	public FlinkAggregateJoinTransposeRule(Class<? extends Aggregate> aggregateClass,
RelFactories.AggregateFactory aggregateFactory, Class<? extends Join> joinClass, RelFactories.JoinFactory
joinFactory, boolean allowFunctions) {
-		this(aggregateClass, joinClass, RelBuilder.proto(aggregateFactory, joinFactory), allowFunctions);
-	}
-
-	/**
-	 * @deprecated to be removed before 2.0
-	 */
-	@Deprecated
-	public FlinkAggregateJoinTransposeRule(Class<? extends Aggregate> aggregateClass,
RelFactories.AggregateFactory aggregateFactory, Class<? extends Join> joinClass, RelFactories.JoinFactory
joinFactory, RelFactories.ProjectFactory projectFactory) {
-		this(aggregateClass, joinClass, RelBuilder.proto(aggregateFactory, joinFactory, projectFactory),
false);
-	}
-
-	/**
-	 * @deprecated to be removed before 2.0
-	 */
-	@Deprecated
-	public FlinkAggregateJoinTransposeRule(Class<? extends Aggregate> aggregateClass,
RelFactories.AggregateFactory aggregateFactory, Class<? extends Join> joinClass, RelFactories.JoinFactory
joinFactory, RelFactories.ProjectFactory projectFactory, boolean allowFunctions) {
-		this(aggregateClass, joinClass, RelBuilder.proto(aggregateFactory, joinFactory, projectFactory),
allowFunctions);
-	}
-
-	public void onMatch(RelOptRuleCall call) {
-		final Aggregate aggregate = call.rel(0);
-		final Join join = call.rel(1);
-		final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();
-		final RelBuilder relBuilder = call.builder();
-
-		// If any aggregate functions do not support splitting, bail out
-		// If any aggregate call has a filter, bail out
-		for (AggregateCall aggregateCall : aggregate.getAggCallList()) {
-			if (aggregateCall.getAggregation().unwrap(SqlSplittableAggFunction.class) == null) {
-				return;
-			}
-			if (aggregateCall.filterArg >= 0) {
-				return;
-			}
-		}
-
-		// If it is not an inner join, we do not push the
-		// aggregate operator
-		if (join.getJoinType() != JoinRelType.INNER) {
-			return;
-		}
-
-		if (!allowFunctions && !aggregate.getAggCallList().isEmpty()) {
-			return;
-		}
-
-		// Do the columns used by the join appear in the output of the aggregate?
-		final ImmutableBitSet aggregateColumns = aggregate.getGroupSet();
-		final RelMetadataQuery mq = RelMetadataQuery.instance();
-		final ImmutableBitSet keyColumns = keyColumns(aggregateColumns, mq.getPulledUpPredicates(join).pulledUpPredicates);
-		final ImmutableBitSet joinColumns = RelOptUtil.InputFinder.bits(join.getCondition());
-		final boolean allColumnsInAggregate = keyColumns.contains(joinColumns);
-		final ImmutableBitSet belowAggregateColumns = aggregateColumns.union(joinColumns);
-
-		// Split join condition
-		final List<Integer> leftKeys = Lists.newArrayList();
-		final List<Integer> rightKeys = Lists.newArrayList();
-		final List<Boolean> filterNulls = Lists.newArrayList();
-		RexNode nonEquiConj = RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(), join.getCondition(),
leftKeys, rightKeys, filterNulls);
-		// If it contains non-equi join conditions, we bail out
-		if (!nonEquiConj.isAlwaysTrue()) {
-			return;
-		}
-
-		// Push each aggregate function down to each side that contains all of its
-		// arguments. Note that COUNT(*), because it has no arguments, can go to
-		// both sides.
-		final Map<Integer, Integer> map = new HashMap<>();
-		final List<Side> sides = new ArrayList<>();
-		int uniqueCount = 0;
-		int offset = 0;
-		int belowOffset = 0;
-		for (int s = 0; s < 2; s++) {
-			final Side side = new Side();
-			final RelNode joinInput = join.getInput(s);
-			int fieldCount = joinInput.getRowType().getFieldCount();
-			final ImmutableBitSet fieldSet = ImmutableBitSet.range(offset, offset + fieldCount);
-			final ImmutableBitSet belowAggregateKeyNotShifted = belowAggregateColumns.intersect(fieldSet);
-			for (Ord<Integer> c : Ord.zip(belowAggregateKeyNotShifted)) {
-				map.put(c.e, belowOffset + c.i);
-			}
-			final ImmutableBitSet belowAggregateKey = belowAggregateKeyNotShifted.shift(-offset);
-			final boolean unique;
-			if (!allowFunctions) {
-				assert aggregate.getAggCallList().isEmpty();
-				// If there are no functions, it doesn't matter as much whether we
-				// aggregate the inputs before the join, because there will not be
-				// any functions experiencing a cartesian product effect.
-				//
-				// But finding out whether the input is already unique requires a call
-				// to areColumnsUnique that currently (until [CALCITE-1048] "Make
-				// metadata more robust" is fixed) places a heavy load on
-				// the metadata system.
-				//
-				// So we choose to imagine the the input is already unique, which is
-				// untrue but harmless.
-				//
-				Util.discard(Bug.CALCITE_1048_FIXED);
-				unique = true;
-			} else {
-				final Boolean unique0 = mq.areColumnsUnique(joinInput, belowAggregateKey);
-				unique = unique0 != null && unique0;
-			}
-			if (unique) {
-				++uniqueCount;
-				side.aggregate = false;
-				side.newInput = joinInput;
-			} else {
-				side.aggregate = true;
-				List<AggregateCall> belowAggCalls = new ArrayList<>();
-				final SqlSplittableAggFunction.Registry<AggregateCall> belowAggCallRegistry = registry(belowAggCalls);
-				final Mappings.TargetMapping mapping = s == 0 ? Mappings.createIdentity(fieldCount) :
Mappings.createShiftMapping(fieldCount + offset, 0, offset, fieldCount);
-				for (Ord<AggregateCall> aggCall : Ord.zip(aggregate.getAggCallList())) {
-					final SqlAggFunction aggregation = aggCall.e.getAggregation();
-					final SqlSplittableAggFunction splitter = Preconditions.checkNotNull(aggregation.unwrap(SqlSplittableAggFunction.class));
-					final AggregateCall call1;
-					if (fieldSet.contains(ImmutableBitSet.of(aggCall.e.getArgList()))) {
-						call1 = splitter.split(aggCall.e, mapping);
-					} else {
-						call1 = splitter.other(rexBuilder.getTypeFactory(), aggCall.e);
-					}
-					if (call1 != null) {
-						side.split.put(aggCall.i, belowAggregateKey.cardinality() + belowAggCallRegistry.register(call1));
-					}
-				}
-				side.newInput = relBuilder.push(joinInput).aggregate(relBuilder.groupKey(belowAggregateKey,
false, null), belowAggCalls).build();
-			}
-			offset += fieldCount;
-			belowOffset += side.newInput.getRowType().getFieldCount();
-			sides.add(side);
-		}
-
-		if (uniqueCount == 2) {
-			// Both inputs to the join are unique. There is nothing to be gained by
-			// this rule. In fact, this aggregate+join may be the result of a previous
-			// invocation of this rule; if we continue we might loop forever.
-			return;
-		}
-
-		// Update condition
-		final Mapping mapping = (Mapping) Mappings.target(new Function<Integer, Integer>()
{
-			public Integer apply(Integer a0) {
-				return map.get(a0);
-			}
-		}, join.getRowType().getFieldCount(), belowOffset);
-		final RexNode newCondition = RexUtil.apply(mapping, join.getCondition());
-
-		// Create new join
-		relBuilder.push(sides.get(0).newInput).push(sides.get(1).newInput).join(join.getJoinType(),
newCondition);
-
-		// Aggregate above to sum up the sub-totals
-		final List<AggregateCall> newAggCalls = new ArrayList<>();
-		final int groupIndicatorCount = aggregate.getGroupCount() + aggregate.getIndicatorCount();
-		final int newLeftWidth = sides.get(0).newInput.getRowType().getFieldCount();
-		final List<RexNode> projects = new ArrayList<>(rexBuilder.identityProjects(relBuilder.peek().getRowType()));
-		for (Ord<AggregateCall> aggCall : Ord.zip(aggregate.getAggCallList())) {
-			final SqlAggFunction aggregation = aggCall.e.getAggregation();
-			final SqlSplittableAggFunction splitter = Preconditions.checkNotNull(aggregation.unwrap(SqlSplittableAggFunction.class));
-			final Integer leftSubTotal = sides.get(0).split.get(aggCall.i);
-			final Integer rightSubTotal = sides.get(1).split.get(aggCall.i);
-			newAggCalls.add(splitter.topSplit(rexBuilder, registry(projects), groupIndicatorCount,
relBuilder.peek().getRowType(), aggCall.e, leftSubTotal == null ? -1 : leftSubTotal, rightSubTotal
== null ? -1 : rightSubTotal + newLeftWidth));
-		}
-
-		relBuilder.project(projects);
-
-		boolean aggConvertedToProjects = false;
-		if (allColumnsInAggregate) {
-			// let's see if we can convert aggregate into projects
-			List<RexNode> projects2 = new ArrayList<>();
-			for (int key : Mappings.apply(mapping, aggregate.getGroupSet())) {
-				projects2.add(relBuilder.field(key));
-			}
-			for (AggregateCall newAggCall : newAggCalls) {
-				final SqlSplittableAggFunction splitter = newAggCall.getAggregation().unwrap(SqlSplittableAggFunction.class);
-				if (splitter != null) {
-					projects2.add(splitter.singleton(rexBuilder, relBuilder.peek().getRowType(), newAggCall));
-				}
-			}
-			if (projects2.size() == aggregate.getGroupSet().cardinality() + newAggCalls.size()) {
-				// We successfully converted agg calls into projects.
-				relBuilder.project(projects2);
-				aggConvertedToProjects = true;
-			}
-		}
-
-		if (!aggConvertedToProjects) {
-			relBuilder.aggregate(relBuilder.groupKey(Mappings.apply(mapping, aggregate.getGroupSet()),
aggregate.indicator, Mappings.apply2(mapping, aggregate.getGroupSets())), newAggCalls);
-		}
-
-		call.transformTo(relBuilder.build());
-	}
-
-	/**
-	 * Computes the closure of a set of columns according to a given list of
-	 * constraints. Each 'x = y' constraint causes bit y to be set if bit x is
-	 * set, and vice versa.
-	 */
-	private static ImmutableBitSet keyColumns(ImmutableBitSet aggregateColumns, ImmutableList<RexNode>
predicates) {
-		SortedMap<Integer, BitSet> equivalence = new TreeMap<>();
-		for (RexNode pred : predicates) {
-			populateEquivalences(equivalence, pred);
-		}
-		ImmutableBitSet keyColumns = aggregateColumns;
-		for (Integer aggregateColumn : aggregateColumns) {
-			final BitSet bitSet = equivalence.get(aggregateColumn);
-			if (bitSet != null) {
-				keyColumns = keyColumns.union(bitSet);
-			}
-		}
-		return keyColumns;
-	}
-
-	private static void populateEquivalences(Map<Integer, BitSet> equivalence, RexNode
predicate) {
-		switch (predicate.getKind()) {
-			case EQUALS:
-				RexCall call = (RexCall) predicate;
-				final List<RexNode> operands = call.getOperands();
-				if (operands.get(0) instanceof RexInputRef) {
-					final RexInputRef ref0 = (RexInputRef) operands.get(0);
-					if (operands.get(1) instanceof RexInputRef) {
-						final RexInputRef ref1 = (RexInputRef) operands.get(1);
-						populateEquivalence(equivalence, ref0.getIndex(), ref1.getIndex());
-						populateEquivalence(equivalence, ref1.getIndex(), ref0.getIndex());
-					}
-				}
-		}
-	}
-
-	private static void populateEquivalence(Map<Integer, BitSet> equivalence, int i0,
int i1) {
-		BitSet bitSet = equivalence.get(i0);
-		if (bitSet == null) {
-			bitSet = new BitSet();
-			equivalence.put(i0, bitSet);
-		}
-		bitSet.set(i1);
-	}
-
-	/**
-	 * Creates a {@link SqlSplittableAggFunction.Registry}
-	 * that is a view of a list.
-	 */
-	private static <E> SqlSplittableAggFunction.Registry<E> registry(final List<E>
list) {
-		return new SqlSplittableAggFunction.Registry<E>() {
-			public int register(E e) {
-				int i = list.indexOf(e);
-				if (i < 0) {
-					i = list.size();
-					list.add(e);
-				}
-				return i;
-			}
-		};
-	}
-
-	/**
-	 * Work space for an input to a join.
-	 */
-	private static class Side {
-		final Map<Integer, Integer> split = new HashMap<>();
-		RelNode newInput;
-		boolean aggregate;
-	}
-}
-
-// End FlinkAggregateJoinTransposeRule.java


Mime
View raw message