flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [4/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from Record API to Java API
Date Tue, 05 May 2015 21:03:43 GMT
[FLINK-1682] Ported optimizer unit tests from Record API to Java API

This closes #627


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd96ba8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd96ba8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd96ba8d

Branch: refs/heads/master
Commit: bd96ba8d1bbdc494ac88b98a6469255572f4a9fc
Parents: adb321d
Author: Fabian Hueske <fhueske@apache.org>
Authored: Sat Apr 25 01:30:11 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue May 5 23:01:47 2015 +0200

----------------------------------------------------------------------
 .../optimizer/AdditionalOperatorsTest.java      |  64 +--
 .../optimizer/BranchingPlansCompilerTest.java   | 509 +++++++------------
 .../BroadcastVariablePipelinebreakerTest.java   |   7 +-
 .../CachedMatchStrategyCompilerTest.java        |   7 +-
 .../optimizer/CoGroupSolutionSetFirstTest.java  |   3 +-
 .../flink/optimizer/DisjointDataFlowsTest.java  |   7 +-
 .../optimizer/DistinctCompilationTest.java      |  10 +-
 .../apache/flink/optimizer/GroupOrderTest.java  | 100 ++--
 .../optimizer/HardPlansCompilationTest.java     |  55 +-
 .../flink/optimizer/IterationsCompilerTest.java |  22 +-
 .../flink/optimizer/NestedIterationsTest.java   |   9 +-
 .../flink/optimizer/ParallelismChangeTest.java  | 250 ++++-----
 .../flink/optimizer/PartitionPushdownTest.java  |   5 +-
 .../optimizer/PartitioningReusageTest.java      |  57 ++-
 .../flink/optimizer/PipelineBreakerTest.java    |  16 +-
 .../flink/optimizer/PropertyDataSourceTest.java |  49 +-
 .../apache/flink/optimizer/ReduceAllTest.java   |  31 +-
 .../optimizer/ReplicatingDataSourceTest.java    |   2 +-
 .../SemanticPropertiesAPIToPlanTest.java        |   5 +-
 .../flink/optimizer/SortPartialReuseTest.java   |  19 +-
 .../UnionBetweenDynamicAndStaticPathTest.java   |   9 +-
 .../optimizer/UnionPropertyPropagationTest.java |  48 +-
 .../flink/optimizer/UnionReplacementTest.java   |   5 +-
 .../WorksetIterationCornerCasesTest.java        |  24 +-
 .../WorksetIterationsRecordApiCompilerTest.java | 110 ++--
 .../CoGroupCustomPartitioningTest.java          |   4 +-
 ...ustomPartitioningGlobalOptimizationTest.java |   4 +-
 .../GroupingKeySelectorTranslationTest.java     |   6 +-
 .../GroupingPojoTranslationTest.java            |   8 +-
 .../GroupingTupleTranslationTest.java           |   8 +-
 .../JoinCustomPartitioningTest.java             |   4 +-
 .../optimizer/java/PartitionOperatorTest.java   |   4 +-
 .../testfunctions/IdentityCoGrouper.java        |  30 ++
 .../testfunctions/IdentityCrosser.java          |  32 ++
 .../testfunctions/IdentityGroupReducer.java     |   2 -
 .../IdentityGroupReducerCombinable.java         |  37 ++
 .../optimizer/testfunctions/IdentityJoiner.java |  32 ++
 .../flink/optimizer/util/DummyCoGroupStub.java  |  42 --
 .../flink/optimizer/util/DummyCrossStub.java    |  32 --
 .../flink/optimizer/util/DummyInputFormat.java  |  42 --
 .../flink/optimizer/util/DummyMatchStub.java    |  37 --
 .../util/DummyNonPreservingMatchStub.java       |  35 --
 .../flink/optimizer/util/DummyOutputFormat.java |  34 --
 .../flink/optimizer/util/IdentityMap.java       |  37 --
 .../flink/optimizer/util/IdentityReduce.java    |  40 --
 .../flink/optimizer/util/OperatorResolver.java  |  15 +-
 46 files changed, 752 insertions(+), 1156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
index 0c50536..a4e74a9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -21,18 +21,14 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
-import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
@@ -41,27 +37,23 @@ import org.junit.Test;
 * Tests that validate optimizer choices when using operators that are requesting certain specific execution
 * strategies.
 */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class AdditionalOperatorsTest extends CompilerTestBase {
 
 	@Test
 	public void testCrossWithSmall() {
 		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-		
-		CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub())
-				.input1(source1).input2(source2)
-				.name("Cross").build();
-	
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-		
-		Plan plan = new Plan(sink);
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> set1 = env.generateSequence(0,1);
+		DataSet<Long> set2 = env.generateSequence(0,1);
+
+		set1.crossWithTiny(set2).name("Cross")
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
 		try {
-			OptimizedPlan oPlan = compileNoStats(plan);
+			JavaPlan plan = env.createProgramPlan();
+			OptimizedPlan oPlan = compileWithStats(plan);
 			OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
 			
 			DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
@@ -72,27 +64,23 @@ public class AdditionalOperatorsTest extends CompilerTestBase {
 			assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
 		} catch(CompilerException ce) {
 			ce.printStackTrace();
-			fail("The pact compiler is unable to compile this plan correctly.");
+			fail("The Flink optimizer is unable to compile this plan correctly.");
 		}
 	}
 	
 	@Test
 	public void testCrossWithLarge() {
 		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-		
-		CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub())
-				.input1(source1).input2(source2)
-				.name("Cross").build();
-	
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-		
-		Plan plan = new Plan(sink);
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> set1 = env.generateSequence(0,1);
+		DataSet<Long> set2 = env.generateSequence(0,1);
+
+		set1.crossWithHuge(set2).name("Cross")
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());;
+
 		try {
+			JavaPlan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
 			OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 94ff41a..1d5b7c1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper;
+import org.apache.flink.optimizer.testfunctions.IdentityCrosser;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.util.Collector;
@@ -41,15 +45,6 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -59,18 +54,8 @@ import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-
-@SuppressWarnings({"serial", "deprecation"})
+
+@SuppressWarnings({"serial"})
 public class BranchingPlansCompilerTest extends CompilerTestBase {
 	
 	
@@ -323,84 +308,53 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchEachContractType() {
 		try {
 			// construct the plan
-			FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A");
-			FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B");
-			FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C");
-			
-			MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build();
-			
-			ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(map1)
-				.name("Reduce 1")
-				.build();
-			
-			JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceB, sourceB, sourceC)
-				.input2(sourceC)
-				.name("Match 1")
-				.build();
-			;
-			CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(sourceA)
-				.input2(sourceB)
-				.name("CoGroup 1")
-				.build();
-			
-			CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub())
-				.input1(reduce1)
-				.input2(cogroup1)
-				.name("Cross 1")
-				.build();
-			
-			
-			CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cross1)
-				.input2(cross1)
-				.name("CoGroup 2")
-				.build();
-			
-			CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(map1)
-				.input2(match1)
-				.name("CoGroup 3")
-				.build();
-			
-			
-			MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build();
-			
-			CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(map2)
-				.input2(match1)
-				.name("CoGroup 4")
-				.build();
-			
-			CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cogroup2)
-				.input2(cogroup1)
-				.name("CoGroup 5")
-				.build();
-			
-			CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(reduce1)
-				.input2(cogroup4)
-				.name("CoGroup 6")
-				.build();
-			
-			CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cogroup5)
-				.input2(cogroup6)
-				.name("CoGroup 7")
-				.build();
-			
-			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
-			sink.addInput(sourceA);
-			sink.addInput(cogroup3);
-			sink.addInput(cogroup4);
-			sink.addInput(cogroup1);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sink, "Branching of each contract type");
-			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			DataSet<Long> sourceA = env.generateSequence(0,1);
+			DataSet<Long> sourceB = env.generateSequence(0,1);
+			DataSet<Long> sourceC = env.generateSequence(0,1);
+
+			DataSet<Long> map1 = sourceA.map(new IdentityMapper<Long>()).name("Map 1");
+
+			DataSet<Long> reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 1");
+
+			DataSet<Long> join1 = sourceB.union(sourceB).union(sourceC)
+					.join(sourceC).where("*").equalTo("*")
+					.with(new IdentityJoiner<Long>()).name("Join 1");
+
+			DataSet<Long> coGroup1 = sourceA.coGroup(sourceB).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 1");
+
+			DataSet<Long> cross1 = reduce1.cross(coGroup1)
+					.with(new IdentityCrosser<Long>()).name("Cross 1");
+
+			DataSet<Long> coGroup2 = cross1.coGroup(cross1).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 2");
+
+			DataSet<Long> coGroup3 = map1.coGroup(join1).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 3");
+
+			DataSet<Long> map2 = coGroup3.map(new IdentityMapper<Long>()).name("Map 2");
+
+			DataSet<Long> coGroup4 = map2.coGroup(join1).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 4");
+
+			DataSet<Long> coGroup5 = coGroup2.coGroup(coGroup1).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 5");
+
+			DataSet<Long> coGroup6 = reduce1.coGroup(coGroup4).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 6");
+
+			DataSet<Long> coGroup7 = coGroup5.coGroup(coGroup6).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 7");
+
+			coGroup7.union(sourceA)
+					.union(coGroup3)
+					.union(coGroup4)
+					.union(coGroup1)
+					.output(new DiscardingOutputFormat<Long>());
+
+			JavaPlan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
 			
 			JobGraphGenerator jobGen = new JobGraphGenerator();
@@ -418,47 +372,33 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchingUnion() {
 		try {
 			// construct the plan
-			FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			
-			JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(source1)
-				.input2(source2)
-				.name("Match 1")
-				.build();
-			
-			MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build();
-			
-			ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(ma1)
-				.name("Reduce 1")
-				.build();
-			
-			ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(mat1)
-				.name("Reduce 2")
-				.build();
-			
-			MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build();
-			
-			MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build();
-			
-			@SuppressWarnings("unchecked")
-			JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(r1, r2, ma2, ma3)
-				.input2(ma2)
-				.name("Match 2")
-				.build();
-			mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
-			
-			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
-			
-			
-			// return the PACT plan
-			Plan plan = new Plan(sink, "Branching Union");
-			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			DataSet<Long> source1 = env.generateSequence(0,1);
+			DataSet<Long> source2 = env.generateSequence(0,1);
+
+			DataSet<Long> join1 = source1.join(source2).where("*").equalTo("*")
+					.with(new IdentityJoiner<Long>()).name("Join 1");
+
+			DataSet<Long> map1 = join1.map(new IdentityMapper<Long>()).name("Map 1");
+
+			DataSet<Long> reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 1");
+
+			DataSet<Long> reduce2 = join1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 2");
+
+			DataSet<Long> map2 = join1.map(new IdentityMapper<Long>()).name("Map 2");
+
+			DataSet<Long> map3 = map2.map(new IdentityMapper<Long>()).name("Map 3");
+
+			DataSet<Long> join2 = reduce1.union(reduce2).union(map2).union(map3)
+					.join(map2, JoinHint.REPARTITION_SORT_MERGE).where("*").equalTo("*")
+					.with(new IdentityJoiner<Long>()).name("Join 2");
+
+			join2.output(new DiscardingOutputFormat<Long>());
+
+			JavaPlan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
-			
+
 			JobGraphGenerator jobGen = new JobGraphGenerator();
 			
 			//Compile plan to verify that no error is thrown
@@ -480,22 +420,18 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testBranchingWithMultipleDataSinksSmall() {
 		try {
+			String outPath1 = "/tmp/out1";
+			String outPath2 = "/tmp/out2";
+
 			// construct the plan
-			final String out1Path = "file:///test/1";
-			final String out2Path = "file:///test/2";
-	
-			FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-			
-			FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
-			FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA);
-			
-			List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-			sinks.add(sinkA);
-			sinks.add(sinkB);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			DataSet<Long> source1 = env.generateSequence(0,1);
+
+			source1.writeAsText(outPath1);
+			source1.writeAsText(outPath2);
+
+			JavaPlan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
 			
 			// ---------- check the optimizer plan ----------
@@ -505,15 +441,16 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			
 			// sinks contain all sink paths
 			Set<String> allSinks = new HashSet<String>();
-			allSinks.add(out1Path);
-			allSinks.add(out2Path);
+			allSinks.add(outPath1);
+			allSinks.add(outPath2);
 			
 			for (SinkPlanNode n : oPlan.getDataSinks()) {
-				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
+				String path = ((TextOutputFormat<String>)n.getSinkNode().getOperator()
+						.getFormatWrapper().getUserCodeObject()).getOutputFilePath().toString();
 				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
 			}
 			
-			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
+			// ---------- compile plan to job graph to verify that no error is thrown ----------
 			
 			JobGraphGenerator jobGen = new JobGraphGenerator();
 			jobGen.compileJobGraph(oPlan);
@@ -541,50 +478,38 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		final String out3Path = "file:///test/3";
 		final String out4Path = "file:///test/4";
 
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2");
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3");
-		FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4");
-		
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
-		sinks.add(sink4);
-		
-		// return the PACT plan
-		Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
+		// construct the plan
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> sourceA = env.generateSequence(0,1);
+		DataSet<Long> sourceB = env.generateSequence(0,1);
+
+		sourceA.writeAsText(out1Path);
+		sourceB.writeAsText(out2Path);
+		sourceA.writeAsText(out3Path);
+		sourceB.writeAsText(out4Path);
+
+		JavaPlan plan = env.createProgramPlan();
 		compileNoStats(plan);
+
 	}
 	
 	@Test
 	public void testBranchAfterIteration() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(sourceA);
-		iteration.setMaximumNumberOfIterations(10);
-		
-		MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build();
-		iteration.setNextPartialSolution(mapper);
-		
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1");
-		
-		MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper")
-				.input(iteration).build();
-		
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2");
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		
-		Plan plan = new Plan(sinks);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> sourceA = env.generateSequence(0,1);
+
+		IterativeDataSet<Long> loopHead = sourceA.iterate(10);
+		DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
+		DataSet<Long> loopRes = loopHead.closeWith(loopTail);
+
+		loopRes.output(new DiscardingOutputFormat<Long>());
+		loopRes.map(new IdentityMapper<Long>())
+				.output(new DiscardingOutputFormat<Long>());;
+
+		JavaPlan plan = env.createProgramPlan();
+
 		try {
 			compileNoStats(plan);
 		}
@@ -596,31 +521,20 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	
 	@Test
 	public void testBranchBeforeIteration() {
-		FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(source2);
-		iteration.setMaximumNumberOfIterations(10);
-		
-		MapOperator inMap = MapOperator.builder(new IdentityMap())
-				                       .input(source1)
-				                       .name("In Iteration Map")
-				                       .setBroadcastVariable("BC", iteration.getPartialSolution())
-				                       .build();
-		
-		iteration.setNextPartialSolution(inMap);
-		
-		MapOperator postMap = MapOperator.builder(new IdentityMap())
-										 .input(source1)
-										 .name("Post Iteration Map")
-										 .setBroadcastVariable("BC", iteration)
-										 .build();
-		
-		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink");
-		
-		Plan plan = new Plan(sink);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> source1 = env.generateSequence(0,1);
+		DataSet<Long> source2 = env.generateSequence(0,1);
+
+		IterativeDataSet<Long> loopHead = source2.iterate(10).name("Loop");
+		DataSet<Long> loopTail = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper");
+		DataSet<Long> loopRes = loopHead.closeWith(loopTail);
+
+		DataSet<Long> map = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper");
+		map.output(new DiscardingOutputFormat<Long>());
+
+		JavaPlan plan = env.createProgramPlan();
+
 		try {
 			compileNoStats(plan);
 		}
@@ -644,31 +558,22 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testClosure() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2");
-
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(sourceA);
-		iteration.setMaximumNumberOfIterations(10);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> sourceA = env.generateSequence(0,1);
+		DataSet<Long> sourceB = env.generateSequence(0,1);
 
-		CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction").
-				input1(iteration.getPartialSolution()).
-				input2(sourceB).
-				build();
+		sourceA.output(new DiscardingOutputFormat<Long>());
+		sourceB.output(new DiscardingOutputFormat<Long>());
 
-		iteration.setNextPartialSolution(stepFunction);
+		IterativeDataSet<Long> loopHead = sourceA.iterate(10).name("Loop");
 
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+		DataSet<Long> loopTail = loopHead.cross(sourceB).with(new IdentityCrosser<Long>());
+		DataSet<Long> loopRes = loopHead.closeWith(loopTail);
 
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
+		loopRes.output(new DiscardingOutputFormat<Long>());
 
-		Plan plan = new Plan(sinks);
+		JavaPlan plan = env.createProgramPlan();
 
 		try{
 			compileNoStats(plan);
@@ -691,40 +596,24 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testClosureDeltaIteration() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3");
-
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2");
-
-		DeltaIteration iteration = new DeltaIteration(0, "Loop");
-		iteration.setInitialSolutionSet(sourceA);
-		iteration.setInitialWorkset(sourceB);
-		iteration.setMaximumNumberOfIterations(10);
-
-		CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset").
-				input1(iteration.getWorkset()).
-				input2(sourceC).
-				build();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(0,1).map(new Duplicator<Long>());
+		DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(0,1).map(new Duplicator<Long>());
+		DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(0,1).map(new Duplicator<Long>());
 
-		JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0).
-				name("Next solution set.").
-				input1(nextWorkset).
-				input2(iteration.getSolutionSet()).
-				build();
+		sourceA.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+		sourceC.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 
-		iteration.setNextWorkset(nextWorkset);
-		iteration.setSolutionSetDelta(solutionSetDelta);
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> loop = sourceA.iterateDelta(sourceB, 10, 0);
 
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+		DataSet<Tuple2<Long, Long>> workset = loop.getWorkset().cross(sourceB).with(new IdentityCrosser<Tuple2<Long, Long>>()).name("Next work set");
+		DataSet<Tuple2<Long, Long>> delta = workset.join(loop.getSolutionSet()).where(0).equalTo(0).with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Solution set delta");
 
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
+		DataSet<Tuple2<Long, Long>> result = loop.closeWith(delta, workset);
+		result.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 
-		Plan plan = new Plan(sinks);
+		JavaPlan plan = env.createProgramPlan();
 
 		try{
 			compileNoStats(plan);
@@ -752,44 +641,26 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testDeltaIterationWithStaticInput() {
-		FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
-		MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
-				input(source).
-				name("Identity mapped source").
-				build();
-
-		ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
-				input(source).
-				name("Identity reduce source").
-				build();
-
-		DeltaIteration iteration = new DeltaIteration(0,"Loop");
-		iteration.setMaximumNumberOfIterations(10);
-		iteration.setInitialSolutionSet(source);
-		iteration.setInitialWorkset(mappedSource);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple2<Long, Long>> source = env.generateSequence(0,1).map(new Duplicator<Long>());
 
-		JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0).
-				input1(iteration.getWorkset()).
-				input2(reducedSource).
-				name("Next work set").
-				build();
+		DataSet<Tuple2<Long,Long>> map = source
+				.map(new IdentityMapper<Tuple2<Long, Long>>());
+		DataSet<Tuple2<Long,Long>> reduce = source
+				.reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>());
 
-		JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,
-				0).
-				input1(iteration.getSolutionSet()).
-				input2(nextWorkset).
-				name("Solution set delta").
-				build();
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> loop = source.iterateDelta(map, 10, 0);
 
-		iteration.setNextWorkset(nextWorkset);
-		iteration.setSolutionSetDelta(solutionSetDelta);
+		DataSet<Tuple2<Long, Long>> workset = loop.getWorkset().join(reduce).where(0).equalTo(0)
+				.with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Next work set");
+		DataSet<Tuple2<Long, Long>> delta = loop.getSolutionSet().join(workset).where(0).equalTo(0)
+				.with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Solution set delta");
 
-		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink);
+		DataSet<Tuple2<Long, Long>> result = loop.closeWith(delta, workset);
+		result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 
-		Plan plan = new Plan(sinks);
+		JavaPlan plan = env.createProgramPlan();
 
 		try{
 			compileNoStats(plan);
@@ -871,7 +742,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 				.withBroadcastSet(input3, "bc1")
 				.withBroadcastSet(input1, "bc2")
 				.withBroadcastSet(result1, "bc3")
-			.print();
+				.output(new DiscardingOutputFormat<String>());
 		
 		Plan plan = env.createProgramPlan();
 		
@@ -900,7 +771,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		IterativeDataSet<String> iteration = initialSolution.iterate(100);
 		
 		iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
-				.print();
+				.output(new DiscardingOutputFormat<String>());
 		
 		Plan plan = env.createProgramPlan();
 		
@@ -927,9 +798,12 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		IterativeDataSet<String> iteration2 = input.iterate(20);
 		IterativeDataSet<String> iteration3 = input.iterate(17);
 		
-		iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print();
-		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print();
-		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print();
+		iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1"))
+				.output(new DiscardingOutputFormat<String>());
+		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2"))
+				.output(new DiscardingOutputFormat<String>());
+		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3"))
+				.output(new DiscardingOutputFormat<String>());
 		
 		Plan plan = env.createProgramPlan();
 		
@@ -953,9 +827,12 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		IterativeDataSet<String> iteration3 = input.iterate(17);
 		
 		
-		iteration1.closeWith(iteration1.map(new IdentityMapper<String>())).print();
-		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>())).print();
-		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>())).print();
+		iteration1.closeWith(iteration1.map(new IdentityMapper<String>()))
+				.output(new DiscardingOutputFormat<String>());
+		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()))
+				.output(new DiscardingOutputFormat<String>());
+		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()))
+				.output(new DiscardingOutputFormat<String>());
 		
 		Plan plan = env.createProgramPlan();
 		
@@ -979,7 +856,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			input
 				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
 				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
-				.print();
+				.output(new DiscardingOutputFormat<Long>());
 			
 			Plan plan = env.createProgramPlan();
 			compileNoStats(plan);
@@ -1019,7 +896,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 				.map(new IdentityMapper<Tuple2<Long,Long>>())
 					.withBroadcastSet(bc_input1, "bc1")
 				.union(joinResult)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan plan = env.createProgramPlan();
 			compileNoStats(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
index 57c53ff..b0ecfe5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
@@ -43,7 +44,8 @@ public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
 			DataSet<String> source1 = env.fromElements("test");
 			DataSet<String> source2 = env.fromElements("test");
 			
-			source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
+			source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
+					.output(new DiscardingOutputFormat<String>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -66,7 +68,8 @@ public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
 				
 				DataSet<String> source1 = env.fromElements("test");
 				
-				source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
+				source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
+						.output(new DiscardingOutputFormat<String>());
 				
 				Plan p = env.createProgramPlan();
 				OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index 1a4cd18..e795508 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -215,7 +216,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 		Configuration joinStrategy = new Configuration();
 		joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
 		
-		if(strategy != "") {
+		if(!strategy.equals("")) {
 			joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
 		}
 		
@@ -223,7 +224,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 
 		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
 		
-		output.print();
+		output.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		
 		return env.createProgramPlan();
 		
@@ -250,7 +251,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 
 		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
 		
-		output.print();
+		output.output(new DiscardingOutputFormat<Tuple3<Long,Long,Long>>());
 		
 		return env.createProgramPlan();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
index 61d407a..f066b36 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
 
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Assert;
 import org.junit.Test;
@@ -64,7 +65,7 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
 		DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
 		DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
 
-		result.print();
+		result.output(new DiscardingOutputFormat<Tuple1<Integer>>());
 
 		Plan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
index bb3aa47..68953c0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -36,8 +37,10 @@ public class DisjointDataFlowsTest extends CompilerTestBase {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			
 			// generate two different flows
-			env.generateSequence(1, 10).print();
-			env.generateSequence(1, 10).print();
+			env.generateSequence(1, 10)
+					.output(new DiscardingOutputFormat<Long>());
+			env.generateSequence(1, 10)
+					.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 7865861..5827d9c 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -50,7 +51,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 
 			data
 					.distinct().name("reducer")
-					.print().name("sink");
+					.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -85,7 +86,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
@@ -104,7 +104,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 					.distinct(new KeySelector<Tuple2<String,Double>, String>() {
 						public String getKey(Tuple2<String, Double> value) { return value.f0; }
 					}).name("reducer")
-					.print().name("sink");
+					.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -146,7 +146,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
@@ -164,7 +163,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			DistinctOperator<Tuple2<String, Double>> reduced = data
 					.distinct(1).name("reducer");
 
-			reduced.print().name("sink");
+			reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -199,7 +198,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
index 76b3b0e..7328423 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
@@ -20,30 +20,24 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -51,25 +45,24 @@ import org.junit.Test;
  * This test case has been created to validate that correct strategies are used if orders within groups are
  * requested.
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class GroupOrderTest extends CompilerTestBase {
 
 	@Test
 	public void testReduceWithGroupOrder() {
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		
-		ReduceOperator reduce = ReduceOperator.builder(new IdentityReduce()).keyField(IntValue.class, 2).name("Reduce").input(source).build();
-		Ordering groupOrder = new Ordering(5, StringValue.class, Order.DESCENDING);
-		reduce.setGroupOrder(groupOrder);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, reduce, "Sink");
-		
-		
-		Plan plan = new Plan(sink, "Test Temp Task");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple4<Long, Long, Long, Long>> set1 = env.readCsvFile("/tmp/fake.csv")
+				.types(Long.class, Long.class, Long.class, Long.class);
+
+		set1.groupBy(1).sortGroup(3, Order.DESCENDING)
+				.reduceGroup(new IdentityGroupReducer<Tuple4<Long, Long, Long, Long>>()).name("Reduce")
+				.output(new DiscardingOutputFormat<Tuple4<Long, Long, Long, Long>>()).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan;
+
 		try {
 			oPlan = compileNoStats(plan);
 		} catch(CompilerException ce) {
@@ -89,38 +82,35 @@ public class GroupOrderTest extends CompilerTestBase {
 		Channel c = reducer.getInput();
 		Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy());
 		
-		FieldList ship = new FieldList(2);
-		FieldList local = new FieldList(2, 5);
+		FieldList ship = new FieldList(1);
+		FieldList local = new FieldList(1, 3);
 		Assert.assertEquals(ship, c.getShipStrategyKeys());
 		Assert.assertEquals(local, c.getLocalStrategyKeys());
 		Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]);
 		
 		// check that we indeed sort descending
-		Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]);
+		Assert.assertEquals(false, c.getLocalStrategySortOrder()[1]);
 	}
 	
 	@Test
 	public void testCoGroupWithGroupOrder() {
 		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source2");
-		
-		CoGroupOperator coGroup = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 3, 6)
-				.keyField(LongValue.class, 0, 0)
-				.name("CoGroup").input1(source1).input2(source2).build();
-		
-		Ordering groupOrder1 = new Ordering(5, StringValue.class, Order.DESCENDING);
-		Ordering groupOrder2 = new Ordering(1, StringValue.class, Order.DESCENDING);
-		groupOrder2.appendOrdering(4, DoubleValue.class, Order.ASCENDING);
-		coGroup.setGroupOrderForInputOne(groupOrder1);
-		coGroup.setGroupOrderForInputTwo(groupOrder2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, coGroup, "Sink");
-		
-		Plan plan = new Plan(sink, "Reduce Group Order Test");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple7<Long, Long, Long, Long, Long, Long, Long>> set1 = env.readCsvFile("/tmp/fake1.csv")
+				.types(Long.class, Long.class, Long.class, Long.class, Long.class, Long.class, Long.class);
+		DataSet<Tuple7<Long, Long, Long, Long, Long, Long, Long>> set2 = env.readCsvFile("/tmp/fake2.csv")
+				.types(Long.class, Long.class, Long.class, Long.class, Long.class, Long.class, Long.class);
+
+		set1.coGroup(set2).where(3,0).equalTo(6,0)
+				.sortFirstGroup(5, Order.DESCENDING)
+				.sortSecondGroup(1, Order.DESCENDING).sortSecondGroup(4, Order.ASCENDING)
+				.with(new IdentityCoGrouper<Tuple7<Long, Long, Long, Long, Long, Long, Long>>()).name("CoGroup")
+				.output(new DiscardingOutputFormat<Tuple7<Long, Long, Long, Long, Long, Long, Long>>()).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan;
+
 		try {
 			oPlan = compileNoStats(plan);
 		} catch(CompilerException ce) {
@@ -144,11 +134,11 @@ public class GroupOrderTest extends CompilerTestBase {
 		Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy());
 		Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy());
 		
-		FieldList ship1 = new FieldList(new int[] {3, 0});
-		FieldList ship2 = new FieldList(new int[] {6, 0});
+		FieldList ship1 = new FieldList(3, 0);
+		FieldList ship2 = new FieldList(6, 0);
 		
-		FieldList local1 = new FieldList(new int[] {3, 0, 5});
-		FieldList local2 = new FieldList(new int[] {6, 0, 1, 4});
+		FieldList local1 = new FieldList(3, 0, 5);
+		FieldList local2 = new FieldList(6, 0, 1, 4);
 		
 		Assert.assertEquals(ship1, c1.getShipStrategyKeys());
 		Assert.assertEquals(ship2, c2.getShipStrategyKeys());
@@ -161,8 +151,8 @@ public class GroupOrderTest extends CompilerTestBase {
 		Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
 		
 		// check that the local group orderings are correct
-		Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == groupOrder1.getFieldSortDirections()[0]);
-		Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == groupOrder2.getFieldSortDirections()[0]);
-		Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == groupOrder2.getFieldSortDirections()[1]);
+		Assert.assertEquals(false, c1.getLocalStrategySortOrder()[2]);
+		Assert.assertEquals(false, c2.getLocalStrategySortOrder()[2]);
+		Assert.assertEquals(true, c2.getLocalStrategySortOrder()[3]);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
index 52e9a2d..adca504 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
@@ -18,21 +18,16 @@
 
 package org.apache.flink.optimizer;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityCrosser;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.IntValue;
 import org.junit.Test;
 
 /**
@@ -41,7 +36,7 @@ import org.junit.Test;
  *   <li> Ticket 158
  * </ul>
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class HardPlansCompilationTest extends CompilerTestBase {
 	
 	/**
@@ -54,27 +49,21 @@ public class HardPlansCompilationTest extends CompilerTestBase {
 	@Test
 	public void testTicket158() {
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		
-		MapOperator map = MapOperator.builder(new IdentityMap()).name("Map1").input(source).build();
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce1").input(map).build();
-		
-		CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()).name("Cross1").input1(reduce1).input2(source).build();
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce2").input(cross1).build();
-		
-		CrossOperator cross2 = CrossOperator.builder(new DummyCrossStub()).name("Cross2").input1(reduce2).input2(source).build();
-		
-		ReduceOperator reduce3 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce3").input(cross2).build();
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setInput(reduce3);
-		
-		Plan plan = new Plan(sink, "Test Temp Task");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> set1 = env.generateSequence(0,1);
+
+		set1.map(new IdentityMapper<Long>()).name("Map1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
+				.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
+				.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
+				.output(new DiscardingOutputFormat<Long>()).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileNoStats(plan);
+
 		JobGraphGenerator jobGen = new JobGraphGenerator();
 		jobGen.compileJobGraph(oPlan);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index 0afbe93..269be6e 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -72,7 +73,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
 					.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
 					.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
 			
-			iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();
+			iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result)
+					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			OptimizedPlan p = compileNoStats(env.createProgramPlan());
 			
@@ -104,7 +106,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
 			
-			depResult.print();
+			depResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -120,7 +122,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -140,7 +141,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
 			
-			depResult.print();
+			depResult.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -156,7 +157,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -176,7 +176,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
 			
-			secondResult.print();
+			secondResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -192,7 +192,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -208,7 +207,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
 			
-			doBulkIteration(input1, input2).print();
+			doBulkIteration(input1, input2).output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -226,7 +225,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -253,7 +251,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			initialWorkset
 				.join(result, JoinHint.REPARTITION_HASH_FIRST)
 				.where(0).equalTo(0)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			compileNoStats(p);
@@ -295,7 +293,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
 			
-			result.print();
+			result.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -348,7 +346,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 				.flatMap(new FlatMapJoin());
 		
 		DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-		
+
 		return depResult;
 		
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
index 34fc085..3a51451 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -52,7 +53,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			
 			DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
 			
-			outerResult.print();
+			outerResult.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -88,7 +89,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult);
 			
-			outerResult.print();
+			outerResult.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -126,7 +127,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			
 			DataSet<Long> mainResult = mainIteration.closeWith(joined);
 			
-			mainResult.print();
+			mainResult.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -164,7 +165,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
 			
-			mainResult.print();
+			mainResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
index 8236f10..9ddff33 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.flink.optimizer;
 
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -24,22 +28,13 @@ import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Assert;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.util.Visitor;
 import org.junit.Test;
 
@@ -50,7 +45,7 @@ import org.junit.Test;
  *       parallelism between tasks is increased or decreased.
  * </ul>
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class ParallelismChangeTest extends CompilerTestBase {
 	
 	/**
@@ -62,34 +57,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
 	 */
 	@Test
 	public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
+		final int p = DEFAULT_PARALLELISM;
+
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setParallelism(degOfPar * 2);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing parallelism");
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(p);
+		DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+		set1.map(new IdentityMapper<Long>())
+					.withForwardedFields("*").setParallelism(p).name("Map1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+					.withForwardedFields("*").setParallelism(p).name("Reduce1")
+				.map(new IdentityMapper<Long>())
+					.withForwardedFields("*").setParallelism(p * 2).name("Map2")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+					.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+				.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
 		
@@ -116,33 +101,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
 	 */
 	@Test
 	public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
+		final int p = DEFAULT_PARALLELISM;
+
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setParallelism(degOfPar);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing parallelism");
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(p);
+		DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+		set1.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Map1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Reduce1")
+				.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Map2")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+				.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
@@ -170,34 +146,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
 	 */
 	@Test
 	public void checkPropertyHandlingWithIncreasingLocalParallelism() {
-		final int degOfPar = 2 * DEFAULT_PARALLELISM;
-		
+		final int p = DEFAULT_PARALLELISM * 2;
+
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setParallelism(degOfPar * 2);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing parallelism");
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(p);
+		DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+		set1.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Map1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Reduce1")
+				.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Map2")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+				.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
 		
@@ -217,38 +183,27 @@ public class ParallelismChangeTest extends CompilerTestBase {
 				(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
 	}
 	
-	
-	
 	@Test
 	public void checkPropertyHandlingWithDecreasingParallelism() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
+		final int p = DEFAULT_PARALLELISM;
+
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setParallelism(degOfPar * 2);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setParallelism(degOfPar * 2);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setParallelism(degOfPar * 2);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setParallelism(degOfPar);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setParallelism(degOfPar);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setParallelism(degOfPar);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing parallelism");
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(p);
+
+		env
+			.generateSequence(0, 1).setParallelism(p * 2)
+			.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Map1")
+			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Reduce1")
+			.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Map2")
+			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Reduce2")
+			.output(new DiscardingOutputFormat<Long>()).setParallelism(p).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
 
@@ -284,40 +239,29 @@ public class ParallelismChangeTest extends CompilerTestBase {
 	 */
 	@Test
 	public void checkPropertyHandlingWithTwoInputs() {
+
 		// construct the plan
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
 
-		FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		
-		ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceA)
-			.build();
-		ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceB)
-			.build();
-		
-		JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-			.input1(redA)
-			.input2(redB)
-			.build();
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
-		
-		sourceA.setParallelism(5);
-		sourceB.setParallelism(7);
-		redA.setParallelism(5);
-		redB.setParallelism(7);
-		
-		mat.setParallelism(5);
-		
-		sink.setParallelism(5);
-		
-		
-		// return the PACT plan
-		Plan plan = new Plan(sink, "Partition on DoP Change");
-		
+		DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(5);
+		DataSet<Long> set2 = env.generateSequence(0,1).setParallelism(7);
+
+		DataSet<Long> reduce1 = set1
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+					.withForwardedFields("*").setParallelism(5);
+		DataSet<Long> reduce2 = set2
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+					.withForwardedFields("*").setParallelism(7);
+
+		reduce1.join(reduce2).where("*").equalTo("*")
+					.with(new IdentityJoiner<Long>()).setParallelism(5)
+				.output(new DiscardingOutputFormat<Long>()).setParallelism(5);
+
+		JavaPlan plan = env.createProgramPlan();
+		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
-		
+
 		JobGraphGenerator jobGen = new JobGraphGenerator();
 		
 		//Compile plan to verify that no error is thrown

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
index 72effc1..365726d 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -46,7 +47,7 @@ public class PartitionPushdownTest extends CompilerTestBase {
 			input
 				.groupBy(0, 1).sum(2)
 				.groupBy(0).sum(1)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -82,7 +83,7 @@ public class PartitionPushdownTest extends CompilerTestBase {
 			input
 				.groupBy(0).sum(1)
 				.groupBy(0, 1).sum(2)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);


Mime
View raw message