flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [03/11] git commit: Optimizer evaluates join hints from high level apis.
Date Fri, 17 Oct 2014 14:52:30 GMT
Optimizer evaluates join hints from high level apis.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c5e4b9ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c5e4b9ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c5e4b9ac

Branch: refs/heads/release-0.7
Commit: c5e4b9ac63a64ddbb6eaabca85a39233c9d074b6
Parents: bd88236
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Oct 14 17:21:10 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Oct 17 16:48:55 2014 +0200

----------------------------------------------------------------------
 .../flink/compiler/dag/BinaryUnionNode.java     |   3 +-
 .../apache/flink/compiler/dag/CoGroupNode.java  |  45 +--
 .../apache/flink/compiler/dag/CrossNode.java    |  52 +--
 .../apache/flink/compiler/dag/MatchNode.java    |  86 ++---
 .../apache/flink/compiler/dag/TwoInputNode.java |  21 +-
 .../compiler/dag/WorksetIterationNode.java      |   7 +-
 .../compiler/GroupReduceCompilationTest.java    | 367 ------------------
 .../flink/compiler/ReduceCompilationTest.java   | 261 -------------
 .../WorksetIterationsJavaApiCompilerTest.java   | 301 ---------------
 .../java/GroupReduceCompilationTest.java        | 368 +++++++++++++++++++
 .../compiler/java/JoinTranslationTest.java      | 168 +++++++++
 .../compiler/java/ReduceCompilationTest.java    | 261 +++++++++++++
 .../WorksetIterationsJavaApiCompilerTest.java   | 302 +++++++++++++++
 13 files changed, 1213 insertions(+), 1029 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
index 6a68626..b229a4e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
@@ -20,6 +20,7 @@
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -57,7 +58,7 @@ public class BinaryUnionNode extends TwoInputNode {
 
 	@Override
 	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return new ArrayList<OperatorDescriptorDual>();
+		return Collections.emptyList();
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java
index a6f1a78..5081442 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java
@@ -16,10 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.Ordering;
@@ -35,8 +34,11 @@ import org.apache.flink.compiler.operators.OperatorDescriptorDual;
  */
 public class CoGroupNode extends TwoInputNode {
 	
+	private List<OperatorDescriptorDual> dataProperties;
+	
 	public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> pactContract) {
 		super(pactContract);
+		this.dataProperties = initializeDataProperties();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -58,23 +60,7 @@ public class CoGroupNode extends TwoInputNode {
 
 	@Override
 	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		Ordering groupOrder1 = null;
-		Ordering groupOrder2 = null;
-		
-		CoGroupOperatorBase<?, ?, ?, ?> cgc = getPactContract();
-		groupOrder1 = cgc.getGroupOrderForInputOne();
-		groupOrder2 = cgc.getGroupOrderForInputTwo();
-			
-		if (groupOrder1 != null && groupOrder1.getNumberOfFields() == 0) {
-			groupOrder1 = null;
-		}
-		if (groupOrder2 != null && groupOrder2.getNumberOfFields() == 0) {
-			groupOrder2 = null;
-		}
-		
-		List<OperatorDescriptorDual> l = new ArrayList<OperatorDescriptorDual>(1);
-		l.add(new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2));
-		return l;
+		return this.dataProperties;
 	}
 	
 	public void makeCoGroupWithSolutionSet(int solutionsetInputIndex) {
@@ -86,12 +72,29 @@ public class CoGroupNode extends TwoInputNode {
 		} else {
 			throw new IllegalArgumentException();
 		}
-		this.possibleProperties.clear();
-		this.possibleProperties.add(op);
+		this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(op);
 	}
 
 	@Override
 	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
 		// for CoGroup, we currently make no reasonable default estimates
 	}
+	
+	private List<OperatorDescriptorDual> initializeDataProperties() {
+		Ordering groupOrder1 = null;
+		Ordering groupOrder2 = null;
+		
+		CoGroupOperatorBase<?, ?, ?, ?> cgc = getPactContract();
+		groupOrder1 = cgc.getGroupOrderForInputOne();
+		groupOrder2 = cgc.getGroupOrderForInputTwo();
+			
+		if (groupOrder1 != null && groupOrder1.getNumberOfFields() == 0) {
+			groupOrder1 = null;
+		}
+		if (groupOrder2 != null && groupOrder2.getNumberOfFields() == 0) {
+			groupOrder2 = null;
+		}
+		
+		return Collections.<OperatorDescriptorDual>singletonList(new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2));
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CrossNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CrossNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CrossNode.java
index 5c07667..b1adea6 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CrossNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CrossNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;
@@ -39,31 +38,15 @@ import org.apache.flink.configuration.Configuration;
  */
 public class CrossNode extends TwoInputNode {
 	
+	private final List<OperatorDescriptorDual> dataProperties;
+	
 	/**
 	 * Creates a new CrossNode for the given operator.
 	 * 
-	 * @param pactContract The Cross contract object.
+	 * @param operation The Cross operator object.
 	 */
-	public CrossNode(CrossOperatorBase<?, ?, ?, ?> pactContract) {
-		super(pactContract);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public CrossOperatorBase<?, ?, ?, ?> getPactContract() {
-		return (CrossOperatorBase<?, ?, ?, ?>) super.getPactContract();
-	}
-
-	@Override
-	public String getName() {
-		return "Cross";
-	}
-	
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		
-		CrossOperatorBase<?, ?, ?, ?> operation = getPactContract();
+	public CrossNode(CrossOperatorBase<?, ?, ?, ?> operation) {
+		super(operation);
 		
 		// check small / large hints to decide upon which side is to be broadcasted
 		boolean allowBCfirst = true;
@@ -93,19 +76,19 @@ public class CrossNode extends TwoInputNode {
 				throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy);
 			}
 			
-			return Collections.singletonList(fixedDriverStrat);
+			this.dataProperties = Collections.singletonList(fixedDriverStrat);
 		}
 		else if (operation instanceof CrossOperatorBase.CrossWithSmall) {
 			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
 			list.add(new CrossBlockOuterSecondDescriptor(false, true));
 			list.add(new CrossStreamOuterFirstDescriptor(false, true));
-			return list;
+			this.dataProperties = list;
 		}
 		else if (operation instanceof CrossOperatorBase.CrossWithLarge) {
 			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
 			list.add(new CrossBlockOuterFirstDescriptor(true, false));
 			list.add(new CrossStreamOuterSecondDescriptor(true, false));
-			return list;
+			this.dataProperties = list;
 		}
 		else {
 			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
@@ -113,10 +96,27 @@ public class CrossNode extends TwoInputNode {
 			list.add(new CrossBlockOuterSecondDescriptor());
 			list.add(new CrossStreamOuterFirstDescriptor());
 			list.add(new CrossStreamOuterSecondDescriptor());
-			return list;
+			this.dataProperties = list;
 		}
 	}
 
+	// ------------------------------------------------------------------------
+
+	@Override
+	public CrossOperatorBase<?, ?, ?, ?> getPactContract() {
+		return (CrossOperatorBase<?, ?, ?, ?>) super.getPactContract();
+	}
+
+	@Override
+	public String getName() {
+		return "Cross";
+	}
+	
+	@Override
+	protected List<OperatorDescriptorDual> getPossibleProperties() {
+		return this.dataProperties;
+	}
+
 	/**
 	 * We assume that the cardinality is the product of  the input cardinalities
 	 * and that the result width is the sum of the input widths.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java
index fca7a72..a5a05ea 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MatchNode.java
@@ -19,6 +19,7 @@
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
@@ -37,7 +38,7 @@ import org.apache.flink.configuration.Configuration;
  */
 public class MatchNode extends TwoInputNode {
 	
-	private final JoinHint joinHint;
+	private List<OperatorDescriptorDual> dataProperties;
 	
 	/**
 	 * Creates a new MatchNode for the given join operator.
@@ -46,7 +47,7 @@ public class MatchNode extends TwoInputNode {
 	 */
 	public MatchNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
 		super(joinOperatorBase);
-		this.joinHint = joinOperatorBase.getJoinHint();
+		this.dataProperties = getDataProperties(joinOperatorBase, joinOperatorBase.getJoinHint());
 	}
 
 	// ------------------------------------------------------------------------
@@ -68,8 +69,47 @@ public class MatchNode extends TwoInputNode {
 
 	@Override
 	protected List<OperatorDescriptorDual> getPossibleProperties() {
+		return this.dataProperties;
+	}
+	
+	public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
+		OperatorDescriptorDual op;
+		if (solutionsetInputIndex == 0) {
+			op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
+		} else if (solutionsetInputIndex == 1) {
+			op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
+		} else {
+			throw new IllegalArgumentException();
+		}
+		
+		this.dataProperties = Collections.singletonList(op);
+	}
+	
+	/**
+	 * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
+	 * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
+	 * The result cardinality is hence the larger one.
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
+		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
+		
+		if (this.estimatedNumRecords >= 0) {
+			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
+			
+			if (width > 0) {
+				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
+			}
+		}
+	}
+	
+	private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint) {
 		// see if an internal hint dictates the strategy to use
-		Configuration conf = getPactContract().getParameters();
+		Configuration conf = joinOperatorBase.getParameters();
 		String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
 
 		if (localStrategy != null) {
@@ -94,9 +134,9 @@ public class MatchNode extends TwoInputNode {
 		else {
 			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
 			
-			JoinHint hint = this.joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : this.joinHint;
+			joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
 			
-			switch (hint) {
+			switch (joinHint) {
 				case BROADCAST_HASH_FIRST:
 					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
 					break;
@@ -124,40 +164,4 @@ public class MatchNode extends TwoInputNode {
 			return list;
 		}
 	}
-	
-	public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
-		OperatorDescriptorDual op;
-		if (solutionsetInputIndex == 0) {
-			op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
-		} else if (solutionsetInputIndex == 1) {
-			op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
-		} else {
-			throw new IllegalArgumentException();
-		}
-		
-		this.possibleProperties.clear();
-		this.possibleProperties.add(op);
-	}
-	
-	/**
-	 * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
-	 * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
-	 * The result cardinality is hence the larger one.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
-		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
-		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
-		
-		if (this.estimatedNumRecords >= 0) {
-			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
-			
-			if (width > 0) {
-				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
index ce9214e..b329a6e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
@@ -69,12 +69,12 @@ public abstract class TwoInputNode extends OptimizerNode {
 	
 	protected final FieldList keys2; // The set of key fields for the second input
 	
-	protected final List<OperatorDescriptorDual> possibleProperties;
-	
 	protected PactConnection input1; // The first input edge
 
 	protected PactConnection input2; // The second input edge
-		
+	
+	private List<OperatorDescriptorDual> cachedDescriptors;
+	
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -103,8 +103,6 @@ public abstract class TwoInputNode extends OptimizerNode {
 		} else if (this.keys2 != null) {
 			throw new CompilerException("Keys are set on second input, but not on first.");
 		}
-		
-		this.possibleProperties = getPossibleProperties();
 	}
 
 	// ------------------------------------------------------------------------
@@ -258,6 +256,13 @@ public abstract class TwoInputNode extends OptimizerNode {
 	
 	protected abstract List<OperatorDescriptorDual> getPossibleProperties();
 
+	private List<OperatorDescriptorDual> getProperties() {
+		if (this.cachedDescriptors == null) {
+			this.cachedDescriptors = getPossibleProperties();
+		}
+		return this.cachedDescriptors;
+	}
+	
 	@Override
 	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
 		// get what we inherit and what is preserved by our user code 
@@ -265,7 +270,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 		final InterestingProperties props2 = getInterestingProperties().filterByCodeAnnotations(this, 1);
 		
 		// add all properties relevant to this node
-		for (OperatorDescriptorDual dpd : this.possibleProperties) {
+		for (OperatorDescriptorDual dpd : getProperties()) {
 			for (GlobalPropertiesPair gp : dpd.getPossibleGlobalProperties()) {
 				// input 1
 				props1.addGlobalProperties(gp.getProperties1());
@@ -327,7 +332,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 		{
 			Set<GlobalPropertiesPair> pairsGlob = new HashSet<GlobalPropertiesPair>();
 			Set<LocalPropertiesPair> pairsLoc = new HashSet<LocalPropertiesPair>();
-			for (OperatorDescriptorDual ods : this.possibleProperties) {
+			for (OperatorDescriptorDual ods : getProperties()) {
 				pairsGlob.addAll(ods.getPossibleGlobalProperties());
 				pairsLoc.addAll(ods.getPossibleLocalProperties());
 			}
@@ -469,7 +474,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 				final Channel in2 = template2.clone();
 				ilp2.parameterizeChannel(in2);
 				
-				for (OperatorDescriptorDual dps: this.possibleProperties) {
+				for (OperatorDescriptorDual dps: getProperties()) {
 					for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
 						if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
 							lpp.getProperties2().isMetBy(in2.getLocalProperties()) )

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index bb0a7b0..2cd8294 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;
@@ -68,6 +67,8 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 	
 	private final GlobalProperties partitionedProperties;
 	
+	private final List<OperatorDescriptorDual> dataProperties;
+	
 	private SolutionSetNode solutionSetNode;
 	
 	private WorksetNode worksetNode;
@@ -112,7 +113,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		}
 		this.costWeight = weight; 
 		
-		this.possibleProperties.add(new WorksetOpDescriptor(this.solutionSetKeyFields));
+		this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(new WorksetOpDescriptor(this.solutionSetKeyFields));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -224,7 +225,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 	
 	@Override
 	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return new ArrayList<OperatorDescriptorDual>(1);
+		return this.dataProperties;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
deleted file mode 100644
index 29fc6b9..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.compiler;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.compiler.plan.OptimizedPlan;
-import org.apache.flink.compiler.plan.SingleInputPlanNode;
-import org.apache.flink.compiler.plan.SinkPlanNode;
-import org.apache.flink.compiler.plan.SourcePlanNode;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class GroupReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
-
-	@Test
-	public void testAllGroupReduceNoCombiner() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
-			
-			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
-				public void reduce(Iterable<Double> values, Collector<Double> out) {}
-			}).name("reducer")
-			.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			
-			// the all-reduce has no combiner, when the DOP of the input is one
-			
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// check wiring
-			assertEquals(sourceNode, reduceNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-			
-			// check that reduce has the right strategy
-			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			
-			// check DOP
-			assertEquals(1, sourceNode.getDegreeOfParallelism());
-			assertEquals(1, reduceNode.getDegreeOfParallelism());
-			assertEquals(1, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testAllReduceWithCombiner() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
-			
-			GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
-				public void reduce(Iterable<Long> values, Collector<Long> out) {}
-			}).name("reducer");
-			
-			reduced.setCombinable(true);
-			reduced.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-			
-			// check wiring
-			assertEquals(sourceNode, combineNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-			
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.ALL_GROUP_COMBINE, combineNode.getDriverStrategy());
-			
-			// check DOP
-			assertEquals(8, sourceNode.getDegreeOfParallelism());
-			assertEquals(8, combineNode.getDegreeOfParallelism());
-			assertEquals(1, reduceNode.getDegreeOfParallelism());
-			assertEquals(1, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-	
-	
-	@Test
-	public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-				.name("source").setParallelism(6);
-			
-			data
-				.groupBy(1)
-				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
-			}).name("reducer")
-			.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// check wiring
-			assertEquals(sourceNode, reduceNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-			
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			
-			// check the keys
-			assertEquals(new FieldList(1), reduceNode.getKeys(0));
-			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
-			
-			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testGroupedReduceWithFieldPositionKeyCombinable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-				.name("source").setParallelism(6);
-			
-			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
-					.groupBy(1)
-					.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
-			}).name("reducer");
-			
-			reduced.setCombinable(true);
-			reduced.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-			
-			// check wiring
-			assertEquals(sourceNode, combineNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-			
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
-			
-			// check the keys
-			assertEquals(new FieldList(1), reduceNode.getKeys(0));
-			assertEquals(new FieldList(1), combineNode.getKeys(0));
-			assertEquals(new FieldList(1), combineNode.getKeys(1));
-			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
-			
-			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-				.name("source").setParallelism(6);
-			
-			data
-				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
-					public String getKey(Tuple2<String, Double> value) { return value.f0; }
-				})
-				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
-			}).name("reducer")
-			.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// get the key extractors and projectors
-			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) reduceNode.getInput().getSource();
-			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
-			
-			// check wiring
-			assertEquals(sourceNode, keyExtractor.getInput().getSource());
-			assertEquals(keyProjector, sinkNode.getInput().getSource());
-			
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			
-			// check the keys
-			assertEquals(new FieldList(0), reduceNode.getKeys(0));
-			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
-			
-			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, keyExtractor.getDegreeOfParallelism());
-			
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, keyProjector.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-				.name("source").setParallelism(6);
-			
-			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
-				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
-					public String getKey(Tuple2<String, Double> value) { return value.f0; }
-				})
-				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
-			}).name("reducer");
-			
-			reduced.setCombinable(true);
-			reduced.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-			
-			// get the key extractors and projectors
-			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
-			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
-			
-			// check wiring
-			assertEquals(sourceNode, keyExtractor.getInput().getSource());
-			assertEquals(keyProjector, sinkNode.getInput().getSource());
-			
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
-			
-			// check the keys
-			assertEquals(new FieldList(0), reduceNode.getKeys(0));
-			assertEquals(new FieldList(0), combineNode.getKeys(0));
-			assertEquals(new FieldList(0), combineNode.getKeys(1));
-			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
-			
-			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, keyExtractor.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
-			
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, keyProjector.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
deleted file mode 100644
index 2ec32e6..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.compiler;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.compiler.plan.OptimizedPlan;
-import org.apache.flink.compiler.plan.SingleInputPlanNode;
-import org.apache.flink.compiler.plan.SinkPlanNode;
-import org.apache.flink.compiler.plan.SourcePlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
-
-	@Test
-	public void testAllReduceNoCombiner() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
-			
-			data.reduce(new RichReduceFunction<Double>() {
-				
-				@Override
-				public Double reduce(Double value1, Double value2){
-					return value1 + value2;
-				}
-			}).name("reducer")
-			.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			
-			// the all-reduce has no combiner, when the DOP of the input is one
-			
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// check wiring
-			assertEquals(sourceNode, reduceNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-			
-			// check DOP
-			assertEquals(1, sourceNode.getDegreeOfParallelism());
-			assertEquals(1, reduceNode.getDegreeOfParallelism());
-			assertEquals(1, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testAllReduceWithCombiner() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
-			
-			data.reduce(new RichReduceFunction<Long>() {
-				
-				@Override
-				public Long reduce(Long value1, Long value2){
-					return value1 + value2;
-				}
-			}).name("reducer")
-			.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-			
-			// check wiring
-			assertEquals(sourceNode, combineNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-			
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.ALL_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy());
-			
-			// check DOP
-			assertEquals(8, sourceNode.getDegreeOfParallelism());
-			assertEquals(8, combineNode.getDegreeOfParallelism());
-			assertEquals(1, reduceNode.getDegreeOfParallelism());
-			assertEquals(1, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testGroupedReduceWithFieldPositionKey() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-				.name("source").setParallelism(6);
-			
-			data
-				.groupBy(1)
-				.reduce(new RichReduceFunction<Tuple2<String,Double>>() {
-				@Override
-				public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
-					return null;
-				}
-			}).name("reducer")
-			.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-			
-			// check wiring
-			assertEquals(sourceNode, combineNode.getInput().getSource());
-			assertEquals(reduceNode, sinkNode.getInput().getSource());
-			
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
-			
-			// check the keys
-			assertEquals(new FieldList(1), reduceNode.getKeys(0));
-			assertEquals(new FieldList(1), combineNode.getKeys(0));
-			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
-			
-			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testGroupedReduceWithSelectorFunctionKey() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
-			
-			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-				.name("source").setParallelism(6);
-			
-			data
-				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
-					public String getKey(Tuple2<String, Double> value) { return value.f0; }
-				})
-				.reduce(new RichReduceFunction<Tuple2<String,Double>>() {
-				@Override
-				public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
-					return null;
-				}
-			}).name("reducer")
-			.print().name("sink");
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
-			
-			// get the original nodes
-			SourcePlanNode sourceNode = resolver.getNode("source");
-			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
-			SinkPlanNode sinkNode = resolver.getNode("sink");
-			
-			// get the combiner
-			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
-			
-			// get the key extractors and projectors
-			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
-			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
-			
-			// check wiring
-			assertEquals(sourceNode, keyExtractor.getInput().getSource());
-			assertEquals(keyProjector, sinkNode.getInput().getSource());
-			
-			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
-			
-			// check the keys
-			assertEquals(new FieldList(0), reduceNode.getKeys(0));
-			assertEquals(new FieldList(0), combineNode.getKeys(0));
-			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
-			
-			// check DOP
-			assertEquals(6, sourceNode.getDegreeOfParallelism());
-			assertEquals(6, keyExtractor.getDegreeOfParallelism());
-			assertEquals(6, combineNode.getDegreeOfParallelism());
-			
-			assertEquals(8, reduceNode.getDegreeOfParallelism());
-			assertEquals(8, keyProjector.getDegreeOfParallelism());
-			assertEquals(8, sinkNode.getDegreeOfParallelism());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
deleted file mode 100644
index 048f712..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.compiler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.compiler.plan.DualInputPlanNode;
-import org.apache.flink.compiler.plan.OptimizedPlan;
-import org.apache.flink.compiler.plan.SingleInputPlanNode;
-import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-/**
-* Tests that validate optimizer choices when using operators that are requesting certain specific execution
-* strategies.
-*/
-@SuppressWarnings("serial")
-public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
-	
-	private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant";
-	private static final String JOIN_WITH_SOLUTION_SET = "Test Join SolutionSet";
-	private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce Workset";
-	private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map Delta";
-
-	@Test
-	public void testJavaApiWithDeferredSoltionSetUpdateWithMapper() {
-		try {
-			Plan plan = getJavaTestPlan(false, true);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
-			DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
-			SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
-			SingleInputPlanNode deltaMapper = resolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
-			
-			// iteration preserves partitioning in reducer, so the first partitioning is out of the loop, 
-			// the in-loop partitioning is before the final reducer
-			
-			// verify joinWithInvariant
-			assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); 
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-			
-			// verify joinWithSolutionSet
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-			
-			
-			// verify reducer
-			assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-			
-			// currently, the system may partition before or after the mapper
-			ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
-			ShipStrategyType ss2 = deltaMapper.getOutgoingChannels().get(0).getShipStrategy();
-			
-			assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) ||
-						(ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
-		
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testJavaApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
-		try {
-			Plan plan = getJavaTestPlan(false, false);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
-			DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
-			SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
-			
-			// iteration preserves partitioning in reducer, so the first partitioning is out of the loop, 
-			// the in-loop partitioning is before the final reducer
-			
-			// verify joinWithInvariant
-			assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); 
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-			
-			// verify joinWithSolutionSet
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-			
-			// verify reducer
-			assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-			
-			// verify solution delta
-			assertEquals(2, joinWithSolutionSetNode.getOutgoingChannels().size());
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
-			
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testJavaApiWithDirectSoltionSetUpdate() {
-		try {
-			Plan plan = getJavaTestPlan(true, false);
-			
-			OptimizedPlan oPlan = compileNoStats(plan);
-	
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
-			DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME);
-			DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET);
-			SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
-			
-			// iteration preserves partitioning in reducer, so the first partitioning is out of the loop, 
-			// the in-loop partitioning is before the final reducer
-			
-			// verify joinWithInvariant
-			assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); 
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1());
-			assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2());
-			
-			// verify joinWithSolutionSet
-			assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
-			assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());
-			
-			// verify reducer
-			assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
-			
-			
-			// verify solution delta
-			assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size());
-			assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
-			
-			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	
-	@Test
-	public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-			
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input");
-			
-			DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
-			
-			
-			DataSet<Tuple3<Long, Long, Long>> result = 
-			
-			iter.getWorkset().join(invariantInput)
-				.where(1, 2)
-				.equalTo(1, 2)
-				.with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
-					public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
-						return first;
-					}
-				});
-			
-			try {
-			result.join(iter.getSolutionSet())
-				.where(1, 0)
-				.equalTo(0, 2)
-				.with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
-					public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
-						return second;
-					}
-				});
-				fail("The join should be rejected with key type mismatches.");
-			}
-			catch (InvalidProgramException e) {
-				// expected!
-			}
-			
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test errored: " + e.getMessage());
-		}
-	}
-	
-	private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-		
-		@SuppressWarnings("unchecked")
-		DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
-		@SuppressWarnings("unchecked")
-		DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset");
-		@SuppressWarnings("unchecked")
-		DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input");
-		
-		DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2);
-		
-		
-		DataSet<Tuple3<Long, Long, Long>> joinedWithSolutionSet = 
-		
-		iter.getWorkset().join(invariantInput)
-			.where(1, 2)
-			.equalTo(1, 2)
-			.with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
-				public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
-					return first;
-				}
-			})
-			.name(JOIN_WITH_INVARIANT_NAME)
-		
-		.join(iter.getSolutionSet())
-			.where(1, 0)
-			.equalTo(1, 2)
-			.with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
-				public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
-					return second;
-				}
-			})
-			.name(JOIN_WITH_SOLUTION_SET)
-			.withConstantSetSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null);
-			
-		DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
-			.reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
-				public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
-			})
-			.name(NEXT_WORKSET_REDUCER_NAME)
-			.withConstantSet("1->1","2->2","0->0");
-		
-		
-		DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ?
-				joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
-					.name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") :
-				joinedWithSolutionSet;
-		
-		iter.closeWith(nextSolutionSet, nextWorkset)
-			.print();
-		
-		return env.createProgramPlan();
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
new file mode 100644
index 0000000..454438c
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/GroupReduceCompilationTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.java;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.SourcePlanNode;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class GroupReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+	@Test
+	public void testAllGroupReduceNoCombiner() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
+			
+			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
+				public void reduce(Iterable<Double> values, Collector<Double> out) {}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			
+			// the all-reduce has no combiner, when the DOP of the input is one
+			
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// check wiring
+			assertEquals(sourceNode, reduceNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that reduce has the right strategy
+			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			
+			// check DOP
+			assertEquals(1, sourceNode.getDegreeOfParallelism());
+			assertEquals(1, reduceNode.getDegreeOfParallelism());
+			assertEquals(1, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testAllReduceWithCombiner() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
+			
+			GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
+				public void reduce(Iterable<Long> values, Collector<Long> out) {}
+			}).name("reducer");
+			
+			reduced.setCombinable(true);
+			reduced.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.ALL_GROUP_COMBINE, combineNode.getDriverStrategy());
+			
+			// check DOP
+			assertEquals(8, sourceNode.getDegreeOfParallelism());
+			assertEquals(8, combineNode.getDegreeOfParallelism());
+			assertEquals(1, reduceNode.getDegreeOfParallelism());
+			assertEquals(1, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	
+	@Test
+	public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			data
+				.groupBy(1)
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// check wiring
+			assertEquals(sourceNode, reduceNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getDegreeOfParallelism());
+			assertEquals(8, reduceNode.getDegreeOfParallelism());
+			assertEquals(8, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGroupedReduceWithFieldPositionKeyCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+					.groupBy(1)
+					.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+			}).name("reducer");
+			
+			reduced.setCombinable(true);
+			reduced.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(1));
+			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getDegreeOfParallelism());
+			assertEquals(6, combineNode.getDegreeOfParallelism());
+			assertEquals(8, reduceNode.getDegreeOfParallelism());
+			assertEquals(8, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			data
+				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
+					public String getKey(Tuple2<String, Double> value) { return value.f0; }
+				})
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the key extractors and projectors
+			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, keyExtractor.getInput().getSource());
+			assertEquals(keyProjector, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getDegreeOfParallelism());
+			assertEquals(6, keyExtractor.getDegreeOfParallelism());
+			
+			assertEquals(8, reduceNode.getDegreeOfParallelism());
+			assertEquals(8, keyProjector.getDegreeOfParallelism());
+			assertEquals(8, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
+					public String getKey(Tuple2<String, Double> value) { return value.f0; }
+				})
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+			}).name("reducer");
+			
+			reduced.setCombinable(true);
+			reduced.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// get the key extractors and projectors
+			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, keyExtractor.getInput().getSource());
+			assertEquals(keyProjector, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(1));
+			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getDegreeOfParallelism());
+			assertEquals(6, keyExtractor.getDegreeOfParallelism());
+			assertEquals(6, combineNode.getDegreeOfParallelism());
+			
+			assertEquals(8, reduceNode.getDegreeOfParallelism());
+			assertEquals(8, keyProjector.getDegreeOfParallelism());
+			assertEquals(8, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e4b9ac/flink-compiler/src/test/java/org/apache/flink/compiler/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/JoinTranslationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/JoinTranslationTest.java
new file mode 100644
index 0000000..429fa83
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/JoinTranslationTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class JoinTranslationTest extends CompilerTestBase {
+
+	@Test
+	public void testBroadcastHashFirstTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_FIRST);
+			assertEquals(ShipStrategyType.BROADCAST, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBroadcastHashSecondTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_SECOND);
+			assertEquals(ShipStrategyType.FORWARD, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.BROADCAST, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionHashFirstTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_FIRST);
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionHashSecondTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_SECOND);
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionSortMergeTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_SORT_MERGE);
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.MERGE, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testOptimizerChoosesTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.OPTIMIZER_CHOOSES);
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+			assertTrue(DriverStrategy.HYBRIDHASH_BUILD_FIRST == node.getDriverStrategy() ||
+					DriverStrategy.HYBRIDHASH_BUILD_SECOND == node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	
+	private DualInputPlanNode createPlanAndGetJoinNode(JoinHint hint) {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Long> i1 = env.generateSequence(1, 1000);
+		DataSet<Long> i2 = env.generateSequence(1, 1000);
+		
+		i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).print();
+		
+		Plan plan = env.createProgramPlan();
+		
+		// set statistics to the sources
+		plan.accept(new Visitor<Operator<?>>() {
+			@Override
+			public boolean preVisit(Operator<?> visitable) {
+				if (visitable instanceof GenericDataSourceBase) {
+					GenericDataSourceBase<?, ?> source = (GenericDataSourceBase<?, ?>) visitable;
+					setSourceStatistics(source, 10000000, 1000);
+				}
+				
+				return true;
+			}
+			
+			@Override
+			public void postVisit(Operator<?> visitable) {}
+		});
+		
+		OptimizedPlan op = compileWithStats(plan);
+		
+		return (DualInputPlanNode) ((SinkPlanNode) op.getDataSinks().iterator().next()).getInput().getSource();
+	}
+	
+	
+	
+	private static final class IdentityKeySelector<T> implements KeySelector<T, T> {
+		
+		@Override
+		public T getKey(T value) {
+			return value;
+		}
+	}
+}


Mime
View raw message