flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [5/9] flink git commit: [FLINK-1679] use a consistent name for parallelism
Date Mon, 23 Mar 2015 08:09:25 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 916aa27..2df08a0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -79,7 +79,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			env.setParallelism(DEFAULT_PARALLELISM);
 
 			DataSet<Long> source = env.generateSequence(1, 10000);
 
@@ -120,7 +120,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchingWithMultipleDataSinks2() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			env.setParallelism(DEFAULT_PARALLELISM);
 
 			DataSet<Long> source = env.generateSequence(1, 10000);
 
@@ -184,7 +184,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchingSourceMultipleTimes() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			env.setParallelism(DEFAULT_PARALLELISM);
 
 			DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
 				.map(new Duplicator<Long>());
@@ -267,7 +267,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchingWithMultipleDataSinks() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			env.setParallelism(DEFAULT_PARALLELISM);
 
 			DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
 					.map(new Duplicator<Long>());
@@ -815,7 +815,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testIterationWithStaticInput() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(100);
+			env.setParallelism(100);
 
 			DataSet<Long> source = env.generateSequence(1, 1000000);
 
@@ -842,7 +842,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testBranchingBroadcastVariable() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(100);
+		env.setParallelism(100);
 
 		DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1");
 		DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2");
@@ -914,7 +914,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testMultipleIterations() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(100);
+		env.setParallelism(100);
 		
 		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
 		
@@ -943,7 +943,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testMultipleIterationsWithClosueBCVars() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(100);
+		env.setParallelism(100);
 
 		DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
 			
@@ -970,7 +970,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchesOnlyInBCVariables1() {
 		try{
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(100);
+			env.setParallelism(100);
 
 			DataSet<Long> input = env.generateSequence(1, 10);
 			DataSet<Long> bc_input = env.generateSequence(1, 10);
@@ -993,7 +993,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchesOnlyInBCVariables2() {
 		try{
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(100);
+			env.setParallelism(100);
 
 			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index 3e7da6c..47efeb1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -203,7 +203,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 	private Plan getTestPlanRightStatic(String strategy) {
 		
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 		
 		DataSet<Tuple3<Long, Long, Long>> bigInput = env.readCsvFile("file://bigFile").types(Long.class, Long.class, Long.class).name("bigFile");
 		
@@ -231,7 +231,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 	private Plan getTestPlanLeftStatic(String strategy) {
 		
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 		
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> bigInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
index 565d992..4eed236 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
@@ -71,10 +71,10 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 	public void setup() {
 		this.dataStats = new DataStatistics();
 		this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
-		this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+		this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
 		
 		this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
-		this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+		this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
deleted file mode 100644
index b17e777..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.optimizer;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.Visitor;
-import org.junit.Test;
-
-/**
- * Tests in this class:
- * <ul>
- *   <li>Tests that check the correct handling of the properties and strategies in the case where the degree of
- *       parallelism between tasks is increased or decreased.
- * </ul>
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class DOPChangeTest extends CompilerTestBase {
-	
-	/**
-	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
-	 * 
-	 * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
-	 * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
-	 * transit as well.
-	 */
-	@Test
-	public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setDegreeOfParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setDegreeOfParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setDegreeOfParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setDegreeOfParallelism(degOfPar * 2);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setDegreeOfParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setDegreeOfParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-		
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-		
-		// check the optimized Plan
-		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
-		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
-		// mapper respectively reducer
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-		
-		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-		ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
-		
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
-	}
-	
-	/**
-	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
-	 * 
-	 * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable.
-	 * Expected to re-establish partitioning between map and reduce (hash).
-	 */
-	@Test
-	public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setDegreeOfParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setDegreeOfParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setDegreeOfParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setDegreeOfParallelism(degOfPar);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setDegreeOfParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setDegreeOfParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-		
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-		
-		// check the optimized Plan
-		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
-		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
-		// mapper respectively reducer
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-		
-		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
-		
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
-	}
-	
-	/**
-	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
-	 * 
-	 * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance.
-	 * Expected to re-establish partitioning between map and reduce via a local hash.
-	 */
-	@Test
-	public void checkPropertyHandlingWithIncreasingLocalParallelism() {
-		final int degOfPar = 2 * DEFAULT_PARALLELISM;
-		
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setDegreeOfParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setDegreeOfParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setDegreeOfParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setDegreeOfParallelism(degOfPar * 2);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setDegreeOfParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setDegreeOfParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-		
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-		
-		// check the optimized Plan
-		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
-		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
-		// mapper respectively reducer
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-		
-		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
-		
-		Assert.assertTrue("Invalid ship strategy for an operator.", 
-				(ShipStrategyType.PARTITION_RANDOM ==  mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || 
-				(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
-	}
-	
-	
-	
-	@Test
-	public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
-		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setDegreeOfParallelism(degOfPar * 2);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setDegreeOfParallelism(degOfPar * 2);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setDegreeOfParallelism(degOfPar * 2);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setDegreeOfParallelism(degOfPar);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setDegreeOfParallelism(degOfPar);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setDegreeOfParallelism(degOfPar);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism");
-		
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
-		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
-		// mapper respectively reducer
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
-		Assert.assertTrue("The no sorting local strategy.",
-				LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
-						LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
-
-		Assert.assertTrue("The no partitioning ship strategy.",
-				ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
-						ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
-	}
-
-	/**
-	 * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs.
-	 * 
-	 * Test Plan:
-	 * <pre>
-	 * 
-	 * (source) -> reduce -\
-	 *                      Match -> (sink)
-	 * (source) -> reduce -/
-	 * 
-	 * </pre>
-	 * 
-	 */
-	@Test
-	public void checkPropertyHandlingWithTwoInputs() {
-		// construct the plan
-
-		FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		
-		ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceA)
-			.build();
-		ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceB)
-			.build();
-		
-		JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-			.input1(redA)
-			.input2(redB)
-			.build();
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
-		
-		sourceA.setDegreeOfParallelism(5);
-		sourceB.setDegreeOfParallelism(7);
-		redA.setDegreeOfParallelism(5);
-		redB.setDegreeOfParallelism(7);
-		
-		mat.setDegreeOfParallelism(5);
-		
-		sink.setDegreeOfParallelism(5);
-		
-		
-		// return the PACT plan
-		Plan plan = new Plan(sink, "Partition on DoP Change");
-		
-		OptimizedPlan oPlan = compileNoStats(plan);
-		
-		JobGraphGenerator jobGen = new JobGraphGenerator();
-		
-		//Compile plan to verify that no error is thrown
-		jobGen.compileJobGraph(oPlan);
-		
-		oPlan.accept(new Visitor<PlanNode>() {
-			
-			@Override
-			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof DualInputPlanNode) {
-					DualInputPlanNode node = (DualInputPlanNode) visitable;
-					Channel c1 = node.getInput1();
-					Channel c2 = node.getInput2();
-					
-					Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy());
-					Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy());
-					return false;
-				}
-				return true;
-			}
-			
-			@Override
-			public void postVisit(PlanNode visitable) {
-				// DO NOTHING
-			}
-		});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 34aa9f8..3b7eae7 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -42,7 +42,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 	public void testDistinctPlain() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 					.name("source").setParallelism(6);
@@ -77,7 +77,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
 			assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
 
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(6, combineNode.getParallelism());
 			assertEquals(8, reduceNode.getParallelism());
@@ -94,7 +94,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 	public void testDistinctWithSelectorFunctionKey() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 					.name("source").setParallelism(6);
@@ -135,7 +135,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(new FieldList(0), combineNode.getKeys(0));
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(6, keyExtractor.getParallelism());
 			assertEquals(6, combineNode.getParallelism());
@@ -155,7 +155,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 	public void testDistinctWithFieldPositionKeyCombinable() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 					.name("source").setParallelism(6);
@@ -191,7 +191,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(new FieldList(1), combineNode.getKeys(0));
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(6, combineNode.getParallelism());
 			assertEquals(8, reduceNode.getParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index ac4f820..810ec0e 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -91,7 +91,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	public void testTwoIterationsWithMapperInbetween() throws Exception {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
 			
@@ -129,7 +129,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	public void testTwoIterationsDirectlyChained() throws Exception {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
 			
@@ -165,7 +165,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	public void testTwoWorksetIterationsDirectlyChained() throws Exception {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
 			
@@ -201,7 +201,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	public void testIterationPushingWorkOut() throws Exception {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
 			
@@ -235,7 +235,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	public void testWorksetIterationPipelineBreakerPlacement() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
 			DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
new file mode 100644
index 0000000..a54136a
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer;
+
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyMatchStub;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityMap;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.junit.Assert;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+/**
+ * Tests in this class:
+ * <ul>
+ *   <li>Tests that check the correct handling of the properties and strategies in the case where the
+ *       parallelism between tasks is increased or decreased.
+ * </ul>
+ */
+@SuppressWarnings({"serial", "deprecation"})
+public class ParallelismChangeTest extends CompilerTestBase {
+	
+	/**
+	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+	 * 
+	 * Increases parallelism between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable.
+	 * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network
+	 * transit as well.
+	 */
+	@Test
+	public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
+		final int degOfPar = DEFAULT_PARALLELISM;
+		
+		// construct the plan
+		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+		source.setParallelism(degOfPar);
+		
+		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+		map1.setParallelism(degOfPar);
+		map1.setInput(source);
+		
+		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+		reduce1.setParallelism(degOfPar);
+		reduce1.setInput(map1);
+		
+		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+		map2.setParallelism(degOfPar * 2);
+		map2.setInput(reduce1);
+		
+		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+		reduce2.setParallelism(degOfPar * 2);
+		reduce2.setInput(map2);
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+		sink.setParallelism(degOfPar * 2);
+		sink.setInput(reduce2);
+		
+		Plan plan = new Plan(sink, "Test Increasing parallelism");
+		
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+		
+		// check the optimized Plan
+		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+		// mapper respectively reducer
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+		
+		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+		ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
+		
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
+	}
+	
+	/**
+	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+	 * 
+	 * Increases parallelism between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable.
+	 * Expected to re-establish partitioning between map and reduce (hash).
+	 */
+	@Test
+	public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
+		final int degOfPar = DEFAULT_PARALLELISM;
+		
+		// construct the plan
+		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+		source.setParallelism(degOfPar);
+		
+		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+		map1.setParallelism(degOfPar);
+		map1.setInput(source);
+		
+		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+		reduce1.setParallelism(degOfPar);
+		reduce1.setInput(map1);
+		
+		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+		map2.setParallelism(degOfPar);
+		map2.setInput(reduce1);
+		
+		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+		reduce2.setParallelism(degOfPar * 2);
+		reduce2.setInput(map2);
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+		sink.setParallelism(degOfPar * 2);
+		sink.setInput(reduce2);
+		
+		Plan plan = new Plan(sink, "Test Increasing parallelism");
+		
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+		
+		// check the optimized Plan
+		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+		// mapper respectively reducer
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+		
+		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
+		
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
+	}
+	
+	/**
+	 * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties).
+	 * 
+	 * Increases parallelism between 1st reduce and 2nd map, such that more tasks are on one instance.
+	 * Expected to re-establish partitioning between map and reduce via a local hash.
+	 */
+	@Test
+	public void checkPropertyHandlingWithIncreasingLocalParallelism() {
+		final int degOfPar = 2 * DEFAULT_PARALLELISM;
+		
+		// construct the plan
+		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+		source.setParallelism(degOfPar);
+		
+		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+		map1.setParallelism(degOfPar);
+		map1.setInput(source);
+		
+		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+		reduce1.setParallelism(degOfPar);
+		reduce1.setInput(map1);
+		
+		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+		map2.setParallelism(degOfPar * 2);
+		map2.setInput(reduce1);
+		
+		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+		reduce2.setParallelism(degOfPar * 2);
+		reduce2.setInput(map2);
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+		sink.setParallelism(degOfPar * 2);
+		sink.setInput(reduce2);
+		
+		Plan plan = new Plan(sink, "Test Increasing parallelism");
+		
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+		
+		// check the optimized Plan
+		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+		// mapper respectively reducer
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+		
+		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
+		
+		Assert.assertTrue("Invalid ship strategy for an operator.", 
+				(ShipStrategyType.PARTITION_RANDOM ==  mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || 
+				(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
+	}
+	
+	
+	
+	@Test
+	public void checkPropertyHandlingWithDecreasingParallelism() {
+		final int degOfPar = DEFAULT_PARALLELISM;
+		
+		// construct the plan
+		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
+		source.setParallelism(degOfPar * 2);
+		
+		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
+		map1.setParallelism(degOfPar * 2);
+		map1.setInput(source);
+		
+		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+		reduce1.setParallelism(degOfPar * 2);
+		reduce1.setInput(map1);
+		
+		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
+		map2.setParallelism(degOfPar);
+		map2.setInput(reduce1);
+		
+		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+		reduce2.setParallelism(degOfPar);
+		reduce2.setInput(map2);
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
+		sink.setParallelism(degOfPar);
+		sink.setInput(reduce2);
+		
+		Plan plan = new Plan(sink, "Test Increasing parallelism");
+		
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
+		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
+		// mapper respectively reducer
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+		Assert.assertTrue("The no sorting local strategy.",
+				LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
+						LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
+
+		Assert.assertTrue("The no partitioning ship strategy.",
+				ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
+						ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
+	}
+
+	/**
+	 * Checks that re-partitioning happens when the inputs of a two-input contract have different parallelisms.
+	 * 
+	 * Test Plan:
+	 * <pre>
+	 * 
+	 * (source) -> reduce -\
+	 *                      Match -> (sink)
+	 * (source) -> reduce -/
+	 * 
+	 * </pre>
+	 * 
+	 */
+	@Test
+	public void checkPropertyHandlingWithTwoInputs() {
+		// construct the plan
+
+		FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
+		FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
+		
+		ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+			.input(sourceA)
+			.build();
+		ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+			.input(sourceB)
+			.build();
+		
+		JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+			.input1(redA)
+			.input2(redB)
+			.build();
+		
+		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
+		
+		sourceA.setParallelism(5);
+		sourceB.setParallelism(7);
+		redA.setParallelism(5);
+		redB.setParallelism(7);
+		
+		mat.setParallelism(5);
+		
+		sink.setParallelism(5);
+		
+		
+		// return the PACT plan
+		Plan plan = new Plan(sink, "Partition on DoP Change");
+		
+		OptimizedPlan oPlan = compileNoStats(plan);
+		
+		JobGraphGenerator jobGen = new JobGraphGenerator();
+		
+		//Compile plan to verify that no error is thrown
+		jobGen.compileJobGraph(oPlan);
+		
+		oPlan.accept(new Visitor<PlanNode>() {
+			
+			@Override
+			public boolean preVisit(PlanNode visitable) {
+				if (visitable instanceof DualInputPlanNode) {
+					DualInputPlanNode node = (DualInputPlanNode) visitable;
+					Channel c1 = node.getInput1();
+					Channel c2 = node.getInput2();
+					
+					Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy());
+					Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy());
+					return false;
+				}
+				return true;
+			}
+			
+			@Override
+			public void postVisit(PlanNode visitable) {
+				// DO NOTHING
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 86f01b0..31f71d1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -41,7 +41,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 	public void testPipelineBreakerWithBroadcastVariable() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(64);
+			env.setParallelism(64);
 			
 			DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
 			
@@ -69,7 +69,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 	public void testPipelineBreakerBroadcastedAllReduce() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(64);
+			env.setParallelism(64);
 			
 			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
 			
@@ -103,7 +103,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 	public void testPipelineBreakerBroadcastedPartialSolution() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(64);
+			env.setParallelism(64);
 			
 			
 			DataSet<Long> initialSource = env.generateSequence(1, 10);
@@ -144,7 +144,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 		try {
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(64);
+				env.setParallelism(64);
 				
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
 				
@@ -166,7 +166,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 			
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(64);
+				env.setParallelism(64);
 				
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
 				
@@ -189,7 +189,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 			
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(64);
+				env.setParallelism(64);
 				
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
 				
@@ -212,7 +212,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 			
 			{
 				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(64);
+				env.setParallelism(64);
 				
 				DataSet<Long> initialSource = env.generateSequence(1, 10);
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index 7be2b16..3cf081f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -55,7 +55,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedSource1() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -88,7 +88,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedSource2() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -121,7 +121,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedSource3() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -153,7 +153,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedSource4() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -185,7 +185,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedSource5() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -217,7 +217,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedSource6() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -249,7 +249,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedSource7() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -283,7 +283,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedGroupedSource1() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -317,7 +317,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedGroupedSource2() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -352,7 +352,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedGroupedSource3() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -386,7 +386,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedGroupedSource4() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -420,7 +420,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedGroupedSource5() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -454,7 +454,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedGroupedSource6() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -488,7 +488,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedGroupedSource7() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -521,7 +521,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedGroupedSource8() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -555,7 +555,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedOrderedSource1() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -589,7 +589,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedOrderedSource2() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -624,7 +624,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedOrderedSource3() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -658,7 +658,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedOrderedSource4() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -692,7 +692,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedOrderedSource5() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -726,7 +726,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedOrderedSource6() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -759,7 +759,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkSinglePartitionedOrderedSource7() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
 
@@ -793,7 +793,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkCoPartitionedSources1() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data1 =
 				env.readCsvFile("/some/path").types(Long.class, String.class);
@@ -841,7 +841,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 	public void checkCoPartitionedSources2() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		DataSource<Tuple2<Long, String>> data1 =
 				env.readCsvFile("/some/path").types(Long.class, String.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index da44b59..fd451f7 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -53,7 +53,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	public void checkJoinWithReplicatedSourceInput() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -89,7 +89,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	public void checkJoinWithReplicatedSourceInputBehindMap() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -126,7 +126,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	public void checkJoinWithReplicatedSourceInputBehindFilter() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -163,7 +163,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	public void checkJoinWithReplicatedSourceInputBehindFlatMap() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -200,7 +200,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	public void checkJoinWithReplicatedSourceInputBehindMapPartition() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -237,7 +237,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	public void checkJoinWithReplicatedSourceInputBehindMultiMaps() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -277,7 +277,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	public void checkCrossWithReplicatedSourceInput() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -313,7 +313,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	public void checkCrossWithReplicatedSourceInputBehindMap() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -345,13 +345,13 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	}
 
 	/**
-	 * Tests compiler fail for join program with replicated data source and changing DOP.
+	 * Tests compiler fail for join program with replicated data source and changing parallelism.
 	 */
 	@Test(expected = CompilerException.class)
-	public void checkJoinWithReplicatedSourceInputChangingDOP() {
+	public void checkJoinWithReplicatedSourceInputChangingparallelism() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -370,13 +370,13 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	}
 
 	/**
-	 * Tests compiler fail for join program with replicated data source behind map and changing DOP.
+	 * Tests compiler fail for join program with replicated data source behind map and changing parallelism.
 	 */
 	@Test(expected = CompilerException.class)
-	public void checkJoinWithReplicatedSourceInputBehindMapChangingDOP() {
+	public void checkJoinWithReplicatedSourceInputBehindMapChangingparallelism() {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -402,7 +402,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	@Test(expected = CompilerException.class)
 	public void checkJoinWithReplicatedSourceInputBehindReduce() {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
@@ -427,7 +427,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase {
 	@Test(expected = CompilerException.class)
 	public void checkJoinWithReplicatedSourceInputBehindRebalance() {
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 
 		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
 				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index d397ea2..f865a9f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -45,7 +45,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			final int parallelism = 4;
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(parallelism);
+			env.setParallelism(parallelism);
 			
 			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer,Integer>(0, 0))
 					.rebalance();
@@ -88,7 +88,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			final int parallelism = 4;
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(parallelism);
+			env.setParallelism(parallelism);
 			
 			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer,Integer>(0, 0))
 					.rebalance();
@@ -115,7 +115,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			final int parallelism = 4;
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(parallelism);
+			env.setParallelism(parallelism);
 			
 			DataSet<Pojo> data = env.fromElements(new Pojo())
 					.rebalance();
@@ -158,7 +158,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			final int parallelism = 4;
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(parallelism);
+			env.setParallelism(parallelism);
 			
 			DataSet<Pojo> data = env.fromElements(new Pojo())
 					.rebalance();
@@ -185,7 +185,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			final int parallelism = 4;
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(parallelism);
+			env.setParallelism(parallelism);
 			
 			DataSet<Pojo> data = env.fromElements(new Pojo())
 					.rebalance();
@@ -237,7 +237,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			final int parallelism = 4;
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(parallelism);
+			env.setParallelism(parallelism);
 			
 			DataSet<Pojo> data = env.fromElements(new Pojo())
 					.rebalance();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
index de02836..2f9b32f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -39,7 +39,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
 	public void testDistinctPreservesPartitioningOfDistinctFields() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(4);
+			env.setParallelism(4);
 			
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
@@ -75,7 +75,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
 	public void testDistinctDestroysPartitioningOfNonDistinctFields() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(4);
+			env.setParallelism(4);
 			
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index a683968..c0e2fa7 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -44,7 +44,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 	public void testAllGroupReduceNoCombiner() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
 			
@@ -59,7 +59,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
 			
 			
-			// the all-reduce has no combiner, when the DOP of the input is one
+			// the all-reduce has no combiner, when the parallelism of the input is one
 			
 			SourcePlanNode sourceNode = resolver.getNode("source");
 			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
@@ -72,7 +72,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			// check that reduce has the right strategy
 			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(1, sourceNode.getParallelism());
 			assertEquals(1, reduceNode.getParallelism());
 			assertEquals(1, sinkNode.getParallelism());
@@ -88,7 +88,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 	public void testAllReduceWithCombiner() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
 			
@@ -120,7 +120,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
 			assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(8, sourceNode.getParallelism());
 			assertEquals(8, combineNode.getParallelism());
 			assertEquals(1, reduceNode.getParallelism());
@@ -138,7 +138,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 	public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 				.name("source").setParallelism(6);
@@ -171,7 +171,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(new FieldList(1), reduceNode.getKeys(0));
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(8, reduceNode.getParallelism());
 			assertEquals(8, sinkNode.getParallelism());
@@ -187,7 +187,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 	public void testGroupedReduceWithFieldPositionKeyCombinable() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 				.name("source").setParallelism(6);
@@ -228,7 +228,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(new FieldList(1), combineNode.getKeys(1));
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(6, combineNode.getParallelism());
 			assertEquals(8, reduceNode.getParallelism());
@@ -245,7 +245,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 	public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 				.name("source").setParallelism(6);
@@ -284,7 +284,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(new FieldList(0), reduceNode.getKeys(0));
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(6, keyExtractor.getParallelism());
 			
@@ -303,7 +303,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 	public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 				.name("source").setParallelism(6);
@@ -350,7 +350,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(new FieldList(0), combineNode.getKeys(1));
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(6, keyExtractor.getParallelism());
 			assertEquals(6, combineNode.getParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 37a8e81..57d2d54 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -45,7 +45,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 	public void testIdentityIteration() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(43);
+			env.setParallelism(43);
 			
 			IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
 			iteration.closeWith(iteration).print();
@@ -65,7 +65,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 	public void testEmptyWorksetIteration() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(43);
+			env.setParallelism(43);
 			
 			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
 					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
@@ -93,7 +93,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 	public void testIterationWithUnionRoot() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(43);
+			env.setParallelism(43);
 			
 			IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
 			
@@ -132,7 +132,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 	public void testWorksetIterationWithUnionRoot() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(43);
+			env.setParallelism(43);
 			
 			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
 					.map(new MapFunction<Long, Tuple2<Long, Long>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 0724a9f..e1b18f9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -42,7 +42,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 	public void testAllReduceNoCombiner() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
 			
@@ -61,7 +61,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
 			
 			
-			// the all-reduce has no combiner, when the DOP of the input is one
+			// the all-reduce has no combiner, when the parallelism of the input is one
 			
 			SourcePlanNode sourceNode = resolver.getNode("source");
 			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
@@ -71,7 +71,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(sourceNode, reduceNode.getInput().getSource());
 			assertEquals(reduceNode, sinkNode.getInput().getSource());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(1, sourceNode.getParallelism());
 			assertEquals(1, reduceNode.getParallelism());
 			assertEquals(1, sinkNode.getParallelism());
@@ -87,7 +87,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 	public void testAllReduceWithCombiner() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
 			
@@ -121,7 +121,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(DriverStrategy.ALL_REDUCE, reduceNode.getDriverStrategy());
 			assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(8, sourceNode.getParallelism());
 			assertEquals(8, combineNode.getParallelism());
 			assertEquals(1, reduceNode.getParallelism());
@@ -138,7 +138,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 	public void testGroupedReduceWithFieldPositionKey() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 				.name("source").setParallelism(6);
@@ -179,7 +179,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(new FieldList(1), combineNode.getKeys(0));
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(6, combineNode.getParallelism());
 			assertEquals(8, reduceNode.getParallelism());
@@ -196,7 +196,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 	public void testGroupedReduceWithSelectorFunctionKey() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(8);
+			env.setParallelism(8);
 			
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 				.name("source").setParallelism(6);
@@ -243,7 +243,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(new FieldList(0), combineNode.getKeys(0));
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
-			// check DOP
+			// check parallelism
 			assertEquals(6, sourceNode.getParallelism());
 			assertEquals(6, keyExtractor.getParallelism());
 			assertEquals(6, combineNode.getParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index 8720aa7..f1c2233 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -196,7 +196,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 	public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			env.setParallelism(DEFAULT_PARALLELISM);
 			
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");
@@ -245,7 +245,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 	private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
 		
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+		env.setParallelism(DEFAULT_PARALLELISM);
 		
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set");

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 91d01a2..d0615b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -71,7 +71,7 @@ import static akka.dispatch.Futures.future;
  *         The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes
  *         from the JobGraph's corresponding JobVertex.</li>
  *     <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are
- *         as many ExecutionVertices as the degree of parallelism. The ExecutionVertex is identified by
+ *         as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by
  *         the ExecutionJobVertex and the number of the parallel subtask</li>
  *     <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions
  *         for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 8816a69..c948155 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -194,22 +194,22 @@ public class AbstractJobVertex implements java.io.Serializable {
 	}
 	
 	/**
-	 * Gets the degree of parallelism of the task.
+	 * Gets the parallelism of the task.
 	 * 
-	 * @return The degree of parallelism of the task.
+	 * @return The parallelism of the task.
 	 */
 	public int getParallelism() {
 		return parallelism;
 	}
 
 	/**
-	 * Sets the degree of parallelism for the task.
+	 * Sets the parallelism for the task.
 	 * 
-	 * @param parallelism The degree of parallelism for the task.
+	 * @param parallelism The parallelism for the task.
 	 */
 	public void setParallelism(int parallelism) {
 		if (parallelism < 1) {
-			throw new IllegalArgumentException("The degree of parallelism must be at least one.");
+			throw new IllegalArgumentException("The parallelism must be at least one.");
 		}
 		this.parallelism = parallelism;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java
index fb32a6e..47b1b96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java
@@ -36,7 +36,7 @@ public enum ShipStrategyType {
 	FORWARD(false, false),
 	
 	/**
-	 * Repartitioning the data randomly, typically when the degree of parallelism between two nodes changes.
+	 * Repartitioning the data randomly, typically when the parallelism between two nodes changes.
 	 */
 	PARTITION_RANDOM(true, false),
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index dfe6b50..f0001a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -86,7 +86,7 @@ public class LocalInputSplitsTest {
 				new TestLocatableInputSplit(3, "host3")
 		};
 		
-		// This should fail with an exception, since the DOP of 2 does not
+		// This should fail with an exception, since the parallelism of 2 does not
 		// support strictly local assignment onto 3 hosts
 		try {
 			runTests(numHosts, slotsPerHost, parallelism, splits);


Mime
View raw message