flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/5] flink git commit: [FLINK-1656] Filter ForwardedField properties for group-at-a-time operators in Optimizer.
Date Fri, 03 Apr 2015 19:33:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master b8bb762e3 -> f36eb54ee


[FLINK-1656] Filter ForwardedField properties for group-at-a-time operators in Optimizer.

This closes #525


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dda8565e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dda8565e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dda8565e

Branch: refs/heads/master
Commit: dda8565e6b4f519ac77d66d710a5dc64f1c5740b
Parents: b8bb762
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Mar 23 11:55:34 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Apr 3 20:39:47 2015 +0200

----------------------------------------------------------------------
 docs/programming_guide.md                       |  8 +-
 .../api/java/operators/GroupReduceOperator.java |  2 +-
 .../apache/flink/optimizer/dag/CoGroupNode.java | 39 ++++++++
 .../flink/optimizer/dag/GroupCombineNode.java   | 28 ++++++
 .../flink/optimizer/dag/GroupReduceNode.java    | 28 ++++++
 .../flink/optimizer/dag/MapPartitionNode.java   | 18 ++++
 .../flink/optimizer/dag/SingleInputNode.java    | 16 +++-
 .../flink/optimizer/dag/TwoInputNode.java       | 23 ++++-
 .../flink/optimizer/dag/CoGroupNodeTest.java    | 96 ++++++++++++++++++++
 .../optimizer/dag/GroupCombineNodeTest.java     | 72 +++++++++++++++
 .../optimizer/dag/GroupReduceNodeTest.java      | 71 +++++++++++++++
 .../optimizer/dag/MapPartitionNodeTest.java     | 61 +++++++++++++
 12 files changed, 448 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 4d338d7..d69a303 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -2303,7 +2303,7 @@ env.execute()
 Semantic Annotations
 -----------
 
-Semantic annotations can be used give Flink hints about the behavior of a function. 
+Semantic annotations can be used to give Flink hints about the behavior of a function. 
 They tell the system which fields of a function's input the function reads and evaluates
and
 which fields it unmodified forwards from its input to its output. 
 Semantic annotations are a powerful means to speed up execution, because they
@@ -2325,11 +2325,12 @@ The following semantic annotations are currently supported.
 Forwarded fields information declares input fields which are unmodified forwarded by a function
to the same position or to another position in the output. 
 This information is used by the optimizer to infer whether a data property such as sorting
or 
 partitioning is preserved by a function.
+For functions that operate on groups of input elements such as `GroupReduce`, `GroupCombine`,
`CoGroup`, and `MapPartition`, all fields that are defined as forwarded fields must always
be jointly forwarded from the same input element. The forwarded fields of each element that
is emitted by a group-wise function may originate from a different element of the function's
input group.
 
 Field forward information is specified using [field expressions](#define-keys-using-field-expressions).
 Fields that are forwarded to the same position in the output can be specified by their position.

 The specified position must be valid for the input and output data type and have the same
type.
-For example the String `"f2"` declares that the third field of a Java input tuple is always
equal to the third field in the output tuple.
+For example the String `"f2"` declares that the third field of a Java input tuple is always
equal to the third field in the output tuple. 
 
 Fields which are unmodified forwarded to another position in the output are declared by specifying
the
 source field in the input and the target field in the output as field expressions.
@@ -2389,7 +2390,8 @@ class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
 
 Non-forwarded fields information declares all fields which are not preserved on the same
position in a function's output. 
 The values of all other fields are considered to be preserved at the same position in the
output. 
-Hence, non-forwarded fields information is inverse to forwarded fields information.
+Hence, non-forwarded fields information is inverse to forwarded fields information. 
+Non-forwarded field information for group-wise operators such as `GroupReduce`, `GroupCombine`,
`CoGroup`, and `MapPartition` must fulfill the same requirements as for forwarded field information.
 
 **IMPORTANT**: The specification of non-forwarded fields information is optional. However
if used, 
 **ALL!** non-forwarded fields must be specified, because all other fields are considered
to be forwarded in place. It is safe to declare a forwarded field as non-forwarded.

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 30f2cc4..c96b7c6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -119,7 +119,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN,
OUT
 		
 		return this;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
index 92076c3..20bad0d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -22,13 +22,17 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.CoGroupDescriptor;
 import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetFirstDescriptor;
 import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetSecondDescriptor;
 import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.api.common.operators.util.FieldSet;
 
 /**
  * The Optimizer representation of a <i>CoGroup</i> operator.
@@ -77,6 +81,41 @@ public class CoGroupNode extends TwoInputNode {
 	}
 
 	@Override
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+
+		// Local properties for CoGroup may only be preserved on key fields.
+		DualInputSemanticProperties origProps = ((DualInputOperator<?, ?, ?, ?>) getOperator()).getSemanticProperties();
+
+		DualInputSemanticProperties filteredProps = new DualInputSemanticProperties();
+		FieldSet readSet1 = origProps.getReadFields(0);
+		FieldSet readSet2 = origProps.getReadFields(1);
+		if(readSet1 != null) {
+			filteredProps.addReadFields(0, readSet1);
+		}
+		if(readSet2 != null) {
+			filteredProps.addReadFields(1, readSet2);
+		}
+
+		// preserve only key fields (first input)
+		for(int f : this.keys1) {
+			FieldSet targets = origProps.getForwardingTargetFields(0, f);
+			for(int t : targets) {
+				filteredProps.addForwardedField(0, f, t);
+			}
+		}
+
+		// preserve only key fields (second input)
+		for(int f : this.keys2) {
+			FieldSet targets = origProps.getForwardingTargetFields(1, f);
+			for(int t : targets) {
+				filteredProps.addForwardedField(1, f, t);
+			}
+		}
+
+		return filteredProps;
+	}
+
+	@Override
 	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
 		// for CoGroup, we currently make no reasonable default estimates
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
index d25fed9..766d6af 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -19,7 +19,11 @@
 package org.apache.flink.optimizer.dag;
 
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.AllGroupCombineProperties;
 import org.apache.flink.optimizer.operators.GroupCombineProperties;
@@ -88,6 +92,30 @@ public class GroupCombineNode extends SingleInputNode {
 		return this.possibleProperties;
 	}
 
+	@Override
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+
+		// Local properties for GroupCombine may only be preserved on key fields.
+		SingleInputSemanticProperties origProps =
+				((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
+		SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
+		FieldSet readSet = origProps.getReadFields(0);
+		if(readSet != null) {
+			filteredProps.addReadFields(readSet);
+		}
+
+		// only add forward field information for key fields
+		if(this.keys != null) {
+			for (int f : this.keys) {
+				FieldSet targets = origProps.getForwardingTargetFields(0, f);
+				for (int t : targets) {
+					filteredProps.addForwardedField(f, t);
+				}
+			}
+		}
+		return filteredProps;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Estimates
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index 227b75f..51da36b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -23,6 +23,9 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
@@ -32,6 +35,7 @@ import org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupPropertie
 import org.apache.flink.optimizer.operators.GroupReduceProperties;
 import org.apache.flink.optimizer.operators.GroupReduceWithCombineProperties;
 import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.configuration.Configuration;
 
 /**
@@ -135,6 +139,30 @@ public class GroupReduceNode extends SingleInputNode {
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
 		return this.possibleProperties;
 	}
+
+	@Override
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+
+		// Local properties for GroupReduce may only be preserved on key fields.
+		SingleInputSemanticProperties origProps =
+				((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
+		SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
+		FieldSet readSet = origProps.getReadFields(0);
+		if(readSet != null) {
+			filteredProps.addReadFields(readSet);
+		}
+
+		// only add forward field information for key fields
+		if(this.keys != null) {
+			for (int f : this.keys) {
+				FieldSet targets = origProps.getForwardingTargetFields(0, f);
+				for (int t : targets) {
+					filteredProps.addForwardedField(f, t);
+				}
+			}
+		}
+		return filteredProps;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Estimates

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
index b287c33..6914c15 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
@@ -22,10 +22,13 @@ package org.apache.flink.optimizer.dag;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.MapPartitionDescriptor;
 import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
 
 /**
  * The optimizer's internal representation of a <i>MapPartition</i> operator
node.
@@ -55,6 +58,21 @@ public class MapPartitionNode extends SingleInputNode {
 		return this.possibleProperties;
 	}
 
+	@Override
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+
+		// Local properties for MapPartition may not be preserved.
+		SingleInputSemanticProperties origProps =
+				((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
+		SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
+		FieldSet readSet = origProps.getReadFields(0);
+		if(readSet != null) {
+			filteredProps.addReadFields(readSet);
+		}
+
+		return filteredProps;
+	}
+
 	/**
 	 * Computes the estimates for the MapPartition operator.
 	 * We assume that by default, Map takes one value and transforms it into another value.

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
index e9b31f4..61bee58 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
@@ -148,7 +148,14 @@ public abstract class SingleInputNode extends OptimizerNode {
 	public SemanticProperties getSemanticProperties() {
 		return getOperator().getSemanticProperties();
 	}
-	
+
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+		return this.getSemanticProperties();
+	}
+
+	protected SemanticProperties getSemanticPropertiesForGlobalPropertyFiltering() {
+		return this.getSemanticProperties();
+	}
 
 	@Override
 	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode
defaultExchangeMode)
@@ -467,10 +474,9 @@ public abstract class SingleInputNode extends OptimizerNode {
 			gProps = dps.computeGlobalProperties(gProps);
 			lProps = dps.computeLocalProperties(lProps);
 
-			SemanticProperties props = this.getSemanticProperties();
 			// filter by the user code field copies
-			gProps = gProps.filterBySemanticProperties(props, 0);
-			lProps = lProps.filterBySemanticProperties(props, 0);
+			gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(),
0);
+			lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(),
0);
 			
 			// apply
 			node.initProperties(gProps, lProps);
@@ -478,7 +484,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 			target.add(node);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//                                     Branch Handling
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
index f3122ba..76b03c1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
@@ -607,13 +607,18 @@ public abstract class TwoInputNode extends OptimizerNode {
 			DualInputPlanNode node = operator.instantiate(in1, in2, this);
 			node.setBroadcastInputs(broadcastChannelsCombination);
 
-			SemanticProperties props = this.getSemanticProperties();
-			GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props,
0);
-			GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props,
1);
+			SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
+			GlobalProperties gp1 = in1.getGlobalProperties().clone()
+					.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
+			GlobalProperties gp2 = in2.getGlobalProperties().clone()
+					.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
 			GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);
 
-			LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props,
0);
-			LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props,
1);
+			SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
+			LocalProperties lp1 = in1.getLocalProperties().clone()
+					.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
+			LocalProperties lp2 = in2.getLocalProperties().clone()
+					.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
 			LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
 			
 			node.initProperties(combined, locals);
@@ -722,6 +727,14 @@ public abstract class TwoInputNode extends OptimizerNode {
 	public SemanticProperties getSemanticProperties() {
 		return getOperator().getSemanticProperties();
 	}
+
+	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
+		return this.getSemanticProperties();
+	}
+
+	protected SemanticProperties getSemanticPropertiesForGlobalPropertyFiltering() {
+		return this.getSemanticProperties();
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//                                     Miscellaneous

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/CoGroupNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/CoGroupNodeTest.java
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/CoGroupNodeTest.java
new file mode 100644
index 0000000..96b6d03
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/CoGroupNodeTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CoGroupNodeTest {
+
+	@Test
+	public void testGetSemanticProperties() {
+
+		DualInputSemanticProperties origProps = new DualInputSemanticProperties();
+		// props for first input
+		origProps.addForwardedField(0, 0, 1);
+		origProps.addForwardedField(0, 2, 2);
+		origProps.addForwardedField(0, 3, 4);
+		origProps.addForwardedField(0, 6, 0);
+		origProps.addReadFields(0, new FieldSet(0, 2, 4, 7));
+		// props for second input
+		origProps.addForwardedField(1, 1, 2);
+		origProps.addForwardedField(1, 2, 8);
+		origProps.addForwardedField(1, 3, 7);
+		origProps.addForwardedField(1, 6, 6);
+		origProps.addReadFields(1, new FieldSet(1, 3, 4));
+
+		CoGroupOperatorBase<?,?,?,?> op = mock(CoGroupOperatorBase.class);
+		when(op.getSemanticProperties()).thenReturn(origProps);
+		when(op.getKeyColumns(0)).thenReturn(new int[]{3,2});
+		when(op.getKeyColumns(1)).thenReturn(new int[]{6,3});
+		when(op.getParameters()).thenReturn(new Configuration());
+
+		CoGroupNode node = new CoGroupNode(op);
+
+		SemanticProperties filteredProps = node.getSemanticPropertiesForLocalPropertyFiltering();
+
+		// check first input props
+		assertTrue(filteredProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 1) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 2) == 2);
+		assertTrue(filteredProps.getForwardingSourceField(0, 4) == 3);
+		assertTrue(filteredProps.getForwardingSourceField(0, 0) < 0);
+		// check second input props
+		assertTrue(filteredProps.getReadFields(0).size() == 4);
+		assertTrue(filteredProps.getReadFields(0).contains(0));
+		assertTrue(filteredProps.getReadFields(0).contains(2));
+		assertTrue(filteredProps.getReadFields(0).contains(4));
+		assertTrue(filteredProps.getReadFields(0).contains(7));
+
+		assertTrue(filteredProps.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(1, 2).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(1, 3).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(1, 3).contains(7));
+		assertTrue(filteredProps.getForwardingTargetFields(1, 6).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(1, 6).contains(6));
+		assertTrue(filteredProps.getForwardingSourceField(1, 2) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(1, 8) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(1, 7) == 3);
+		assertTrue(filteredProps.getForwardingSourceField(1, 6) == 6);
+
+		assertTrue(filteredProps.getReadFields(1).size() == 3);
+		assertTrue(filteredProps.getReadFields(1).contains(1));
+		assertTrue(filteredProps.getReadFields(1).contains(3));
+		assertTrue(filteredProps.getReadFields(1).contains(4));
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
new file mode 100644
index 0000000..f4776a0
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GroupCombineNodeTest {
+
+	@Test
+	public void testGetSemanticProperties() {
+
+		SingleInputSemanticProperties origProps = new SingleInputSemanticProperties();
+		origProps.addForwardedField(0, 1);
+		origProps.addForwardedField(2, 2);
+		origProps.addForwardedField(3, 4);
+		origProps.addForwardedField(6, 0);
+		origProps.addReadFields(new FieldSet(0, 2, 4, 7));
+
+		GroupCombineOperatorBase<?,?,?> op = mock(GroupCombineOperatorBase.class);
+		when(op.getSemanticProperties()).thenReturn(origProps);
+		when(op.getKeyColumns(0)).thenReturn(new int[]{3,2});
+		when(op.getParameters()).thenReturn(new Configuration());
+
+		GroupCombineNode node = new GroupCombineNode(op);
+
+		SemanticProperties filteredProps = node.getSemanticPropertiesForLocalPropertyFiltering();
+
+		assertTrue(filteredProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 1) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 2) == 2);
+		assertTrue(filteredProps.getForwardingSourceField(0, 4) == 3);
+		assertTrue(filteredProps.getForwardingSourceField(0, 0) < 0);
+
+		assertTrue(filteredProps.getReadFields(0).size() == 4);
+		assertTrue(filteredProps.getReadFields(0).contains(0));
+		assertTrue(filteredProps.getReadFields(0).contains(2));
+		assertTrue(filteredProps.getReadFields(0).contains(4));
+		assertTrue(filteredProps.getReadFields(0).contains(7));
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupReduceNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupReduceNodeTest.java
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupReduceNodeTest.java
new file mode 100644
index 0000000..da8a0b4
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupReduceNodeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GroupReduceNodeTest {
+
+	@Test
+	public void testGetSemanticProperties() {
+
+		SingleInputSemanticProperties origProps = new SingleInputSemanticProperties();
+		origProps.addForwardedField(0, 1);
+		origProps.addForwardedField(2, 2);
+		origProps.addForwardedField(3, 4);
+		origProps.addForwardedField(6, 0);
+		origProps.addReadFields(new FieldSet(0, 2, 4, 7));
+
+		GroupReduceOperatorBase<?,?,?> op = mock(GroupReduceOperatorBase.class);
+		when(op.getSemanticProperties()).thenReturn(origProps);
+		when(op.getKeyColumns(0)).thenReturn(new int[]{3,2});
+		when(op.getParameters()).thenReturn(new Configuration());
+
+		GroupReduceNode node = new GroupReduceNode(op);
+
+		SemanticProperties filteredProps = node.getSemanticPropertiesForLocalPropertyFiltering();
+
+		assertTrue(filteredProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).contains(2));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 3).contains(4));
+		assertTrue(filteredProps.getForwardingTargetFields(0, 6).size() == 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 1) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 2) == 2);
+		assertTrue(filteredProps.getForwardingSourceField(0, 4) == 3);
+		assertTrue(filteredProps.getForwardingSourceField(0, 0) < 0);
+
+		assertTrue(filteredProps.getReadFields(0).size() == 4);
+		assertTrue(filteredProps.getReadFields(0).contains(0));
+		assertTrue(filteredProps.getReadFields(0).contains(2));
+		assertTrue(filteredProps.getReadFields(0).contains(4));
+		assertTrue(filteredProps.getReadFields(0).contains(7));
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dda8565e/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/MapPartitionNodeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/MapPartitionNodeTest.java
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/MapPartitionNodeTest.java
new file mode 100644
index 0000000..c9c6b50
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/MapPartitionNodeTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MapPartitionNodeTest {
+
+	@Test
+	public void testGetSemanticProperties() {
+
+		SingleInputSemanticProperties origProps = new SingleInputSemanticProperties();
+		origProps.addForwardedField(0, 1);
+		origProps.addForwardedField(2, 2);
+		origProps.addReadFields(new FieldSet(0, 2, 4, 7));
+
+		MapPartitionOperatorBase<?,?,?> op = mock(MapPartitionOperatorBase.class);
+		when(op.getSemanticProperties()).thenReturn(origProps);
+		when(op.getKeyColumns(0)).thenReturn(new int[]{});
+
+		MapPartitionNode node = new MapPartitionNode(op);
+
+		SemanticProperties filteredProps = node.getSemanticPropertiesForLocalPropertyFiltering();
+
+		assertTrue(filteredProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(filteredProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 1) < 0);
+		assertTrue(filteredProps.getForwardingSourceField(0, 2) < 0);
+		assertTrue(filteredProps.getReadFields(0).size() == 4);
+		assertTrue(filteredProps.getReadFields(0).contains(0));
+		assertTrue(filteredProps.getReadFields(0).contains(2));
+		assertTrue(filteredProps.getReadFields(0).contains(4));
+		assertTrue(filteredProps.getReadFields(0).contains(7));
+
+	}
+
+}


Mime
View raw message