flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/5] flink git commit: [FLINK-1418] [apis] Fix eager print() and adjust all tests and examples to not fail due to "eager" print method
Date Thu, 21 May 2015 13:40:00 GMT
Repository: flink
Updated Branches:
  refs/heads/master 939e3fc40 -> ad1d9362c


http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index ff429b8..65b9756 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -55,7 +56,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 			
 			input1
 				.join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0).withPartitioner(partitioner)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -114,7 +115,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 			input1
 				.join(input2, JoinHint.REPARTITION_HASH_FIRST)
 				.where("b").equalTo("a").withPartitioner(partitioner)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -176,7 +177,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 				.where(new Pojo2KeySelector())
 				.equalTo(new Pojo3KeySelector())
 				.withPartitioner(partitioner)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -248,7 +249,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 			grouped
 				.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
 				.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
index ab83dba..25b17f8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java;
 
 import static org.junit.Assert.fail;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
@@ -54,7 +55,7 @@ public class DeltaIterationDependenciesTest extends CompilerTestBase {
 
 			DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset);
 
-			result.print();
+			result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
index 96758b1..e5b6ad5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -48,7 +49,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
 			data.distinct(0)
 				.groupBy(0)
 				.sum(1)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -84,7 +85,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
 			data.distinct(1)
 				.groupBy(0)
 				.sum(1)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 8fb4ef0..6b49dd4 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -51,7 +52,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
 				public void reduce(Iterable<Double> values, Collector<Double> out) {}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Double>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -97,7 +98,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print();
+			reduced.output(new DiscardingOutputFormat<Long>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -148,7 +149,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -199,7 +200,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print();
+			reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -257,7 +258,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -317,7 +318,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print();
+			reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 796d4ab..bcfb2ef 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -48,7 +49,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 			env.setParallelism(43);
 			
 			IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
-			iteration.closeWith(iteration).print();
+			iteration.closeWith(iteration).output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -76,7 +77,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 					
 			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0);
 			iter.closeWith(iter.getWorkset(), iter.getWorkset())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -99,7 +100,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 			
 			iteration.closeWith(
 					iteration.map(new IdentityMapper<Long>()).union(iteration.map(new IdentityMapper<Long>())))
-					.print();
+					.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -150,7 +151,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 				.union(
 						iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()))
 				)
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
index 14d863d..b3718b0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -131,7 +133,7 @@ public class JoinTranslationTest extends CompilerTestBase {
 		DataSet<Long> i1 = env.generateSequence(1, 1000);
 		DataSet<Long> i2 = env.generateSequence(1, 1000);
 		
-		i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).print();
+		i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 		
 		Plan plan = env.createProgramPlan();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
index 3f18e62..e1e6b5f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,7 +46,7 @@ public class OpenIterationTest extends CompilerTestBase {
 			
 			DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
 			
-			mapped.print();
+			mapped.output(new DiscardingOutputFormat<Long>());
 			
 			try {
 				env.createProgramPlan();
@@ -72,9 +73,9 @@ public class OpenIterationTest extends CompilerTestBase {
 			
 			DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
 			
-			iteration.closeWith(mapped).print();
+			iteration.closeWith(mapped).output(new DiscardingOutputFormat<Long>());
 			
-			mapped.print();
+			mapped.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -104,7 +105,7 @@ public class OpenIterationTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
 			
-			mapped.print();
+			mapped.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			try {
 				env.createProgramPlan();
@@ -132,7 +133,7 @@ public class OpenIterationTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> mapped = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
 			
-			mapped.print();
+			mapped.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			try {
 				env.createProgramPlan();
@@ -164,7 +165,7 @@ public class OpenIterationTest extends CompilerTestBase {
 												.where(0).equalTo(0).projectFirst(1).projectSecond(0);
 			
 			iteration.closeWith(joined, joined)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 9c2d0d2..7f5c209 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -49,8 +50,8 @@ public class PartitionOperatorTest extends CompilerTestBase {
 					public int partition(Long key, int numPartitions) { return key.intValue(); }
 				}, 1)
 				.groupBy(1)
-				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
-				.print();
+					.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 2958f1a..942aa47 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
@@ -53,7 +54,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return value1 + value2;
 				}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Double>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -98,7 +99,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return value1 + value2;
 				}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Long>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -151,7 +152,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return null;
 				}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -211,7 +212,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return null;
 				}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index 46eb48a..3d6d90b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -294,7 +295,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 				joinedWithSolutionSet;
 		
 		iter.closeWith(nextSolutionSet, nextWorkset)
-			.print();
+			.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		
 		return env.createProgramPlan();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
index fb7a80f..b23bf35 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.Optimizer;
@@ -48,7 +49,7 @@ public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
 			input.coGroup(input).where(0).equalTo(0)
 				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
 				.withParameters(cfg)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
index 8a4786f..a4e520b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.Optimizer;
@@ -46,7 +47,7 @@ public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
 			
 			input.join(input).where(0).equalTo(0)
 				.withParameters(cfg)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index efa1e88..6cc327a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -131,6 +131,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.getId
   }
 
+
+  /**
+   * retrieves JobExecutionResult from last job execution (for "eager" print)
+   * @return JobExecutionResult form last job execution
+   */
+  def getLastJobExecutionResult = javaEnv.getLastJobExecutionResult
+
   /**
    * Gets the UUID by which this environment is identified, as a string.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index a1bd2e0..b4a27b6 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -19,6 +19,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.operators.AggregateOperator;
 import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
 import org.apache.flink.api.java.operators.CrossOperator.ProjectCross;
@@ -296,7 +297,7 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
 		int parentID = (Integer) receiver.getRecord(true);
 		DataSet parent = (DataSet) sets.get(parentID);
 		boolean toError = (Boolean) receiver.getRecord();
-		(toError ? parent.printToErr() : parent.print()).name("PrintSink");
+		parent.output(new PrintingOutputFormat(toError));
 	}
 
 	private void createBroadcastVariable() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index d0b0164..018daf8 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
@@ -59,7 +60,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
 				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
 				
-				result.print();
+				result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			}
 			
 			Plan p = env.createProgramPlan("Spargel Connected Components");
@@ -134,7 +135,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 				
 				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
 				
-				result.print();
+				result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			}
 			
 			Plan p = env.createProgramPlan("Spargel Connected Components");

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
index 2840914..7189bbe 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 
 public class CollectionTestEnvironment extends CollectionEnvironment {
 
-	protected JobExecutionResult latestResult;
-
 	@Override
 	public JobExecutionResult execute() throws Exception {
 		return execute("test job");
@@ -35,7 +33,7 @@ public class CollectionTestEnvironment extends CollectionEnvironment {
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		JobExecutionResult result = super.execute(jobName);
-		this.latestResult = result;
+		this.lastJobExecutionResult = result;
 		return result;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 2214000..e639c80 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -119,7 +119,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 				// call the test program
 				try {
 					testProgram();
-					this.latestExecutionResult = env.latestResult;
+					this.latestExecutionResult = env.getLastJobExecutionResult();
 				}
 				catch (Exception e) {
 					System.err.println(e.getMessage());
@@ -171,7 +171,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 				// call the test program
 				try {
 					testProgram();
-					this.latestExecutionResult = env.latestResult;
+					this.latestExecutionResult = env.getLastJobExecutionResult();
 				}
 				catch (Exception e) {
 					System.err.println(e.getMessage());
@@ -224,7 +224,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 		// call the test program
 		try {
 			testProgram();
-			this.latestExecutionResult = env.latestResult;
+			this.latestExecutionResult = env.getLastJobExecutionResult();
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index cf1caeb..25f2c83 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -36,9 +36,6 @@ public class TestEnvironment extends ExecutionEnvironment {
 
 	private final ForkableFlinkMiniCluster executor;
 
-	protected JobExecutionResult latestResult;
-
-
 	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
 		this.executor = executor;
 		setParallelism(parallelism);
@@ -54,8 +51,8 @@ public class TestEnvironment extends ExecutionEnvironment {
 			
 			SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
 
-			this.latestResult = result.toJobExecutionResult(getClass().getClassLoader());
-			return this.latestResult;
+			this.lastJobExecutionResult = result.toJobExecutionResult(getClass().getClassLoader());
+			return this.lastJobExecutionResult;
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
index 27c1644..aea448f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -58,8 +59,8 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
 			DataSet<Tuple2<Long, Double>> result = constructPlan(inputData, 10);
 			
 			// add two sinks, to test the case of branching after an iteration
-			result.print();
-			result.print();
+			result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+			result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
 		
 			Plan p = env.createProgramPlan();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
index 4edd68e..a3b7572 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
@@ -23,6 +23,7 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
 import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -85,7 +86,7 @@ public class PageRankCompilerTest extends CompilerTestBase{
 					// termination condition
 					.filter(new EpsilonFilter()));
 	
-			finalPageRanks.print();
+			finalPageRanks.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
 	
 			// get the plan and compile it
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
index 25cc089..2775d09 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.junit.Test
 import org.apache.flink.api.common.InvalidProgramException
 
@@ -37,7 +38,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int, String)])
   }
 
   @Test
@@ -51,7 +52,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -65,7 +66,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -79,7 +80,8 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()  }
+    iteration.output(new DiscardingOutputFormat[(Int,String)])  
+  }
 
   @Test(expected = classOf[InvalidProgramException])
   def testIncorrectJoinWithSolution3(): Unit = {
@@ -92,7 +94,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
    }
 
   @Test
@@ -106,7 +108,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test
@@ -120,7 +122,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -134,7 +136,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -148,7 +150,8 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()  }
+    iteration.output(new DiscardingOutputFormat[(Int,String)])  
+  }
 
   @Test(expected = classOf[InvalidProgramException])
   def testIncorrectCoGroupWithSolution3(): Unit = {
@@ -161,6 +164,6 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
index 3fefa01..97a0f87 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.compiler
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Test
 import org.junit.Assert._
@@ -38,8 +39,8 @@ class PartitionOperatorTranslationTest extends CompilerTestBase {
           def partition(key: Long, numPartitions: Int): Int = key.intValue()
         }, 1)
         .groupBy(1).reduceGroup( x => x)
-        .print()
-      
+        .output(new DiscardingOutputFormat[Iterator[(Long, Long)]])
+
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
       

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
index eecc347..cc2c81e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.functions
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.junit.Assert._
 import org.apache.flink.api.common.functions.RichJoinFunction
 import org.apache.flink.api.common.functions.RichMapFunction
@@ -46,7 +47,8 @@ class SemanticPropertiesTranslationTest {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
       val input = env.fromElements((3L, "test", 42))
-      input.map(new WildcardForwardMapper[(Long, String, Int)]).print()
+      input.map(new WildcardForwardMapper[(Long, String, Int)])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
 
       val plan = env.createProgramPlan()
 
@@ -83,7 +85,8 @@ class SemanticPropertiesTranslationTest {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
       val input = env.fromElements((3L, "test", 42))
-      input.map(new IndividualForwardMapper[Long, String, Int]).print()
+      input.map(new IndividualForwardMapper[Long, String, Int])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
 
       val plan = env.createProgramPlan()
 
@@ -120,7 +123,8 @@ class SemanticPropertiesTranslationTest {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
       val input = env.fromElements((3L, "test", 42))
-      input.map(new FieldTwoForwardMapper[Long, String, Int]).print()
+      input.map(new FieldTwoForwardMapper[Long, String, Int])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
 
       val plan = env.createProgramPlan()
 
@@ -160,7 +164,8 @@ class SemanticPropertiesTranslationTest {
       val input2 = env.fromElements((3L, 3.1415))
 
       input1.join(input2).where(0).equalTo(0)(
-        new ForwardingTupleJoin[Long, String, Long, Double]).print()
+        new ForwardingTupleJoin[Long, String, Long, Double])
+        .output(new DiscardingOutputFormat[(String, Long)])
 
       val plan = env.createProgramPlan()
       val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
@@ -204,7 +209,8 @@ class SemanticPropertiesTranslationTest {
       val input2 = env.fromElements((3L, 42))
 
       input1.join(input2).where(0).equalTo(0)(
-        new ForwardingBasicJoin[(Long, String), (Long, Int)]).print()
+        new ForwardingBasicJoin[(Long, String), (Long, Int)])
+        .output(new DiscardingOutputFormat[((Long, String), (Long, Int))])
 
       val plan = env.createProgramPlan()
       val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
index 425cff6..6babbe7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
 
 import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala._
 import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.Test
@@ -37,7 +38,8 @@ class AggregateTranslationTest {
 
       val initialData = env.fromElements((3.141592, "foobar", 77L))
 
-      initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).print()
+      initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2)
+        .output(new DiscardingOutputFormat[(Double, String, Long)])
 
       val p: Plan = env.createProgramPlan()
       val sink = p.getDataSinks.iterator.next
@@ -55,6 +57,7 @@ class AggregateTranslationTest {
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Test caused an error: " + e.getMessage)
+
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
index 8d75f2e..4d85c58 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -46,7 +47,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
           .coGroup(input2)
           .where(1).equalTo(0)
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Array[(Long, Long)], Array[(Long, Long, Long)])])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -110,7 +111,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
           .coGroup(input2)
           .where("b").equalTo("a")
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Array[Pojo2], Array[Pojo3])])
         
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -174,7 +175,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
           .coGroup(input2)
           .where( _.a ).equalTo( _.b )
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Array[Pojo2], Array[Pojo3])])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
index 8d816ee..a0f93dd 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -49,7 +50,7 @@ class CoGroupGroupSortTranslationTest {
           .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) {
                (first, second) => first.buffered.head
             }
-        .print()
+        .output(new DiscardingOutputFormat[(Long, Long)])
         
       val p = env.createProgramPlan()
       
@@ -92,7 +93,7 @@ class CoGroupGroupSortTranslationTest {
           .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING) {
                (first, second) => first.buffered.head
             }
-          .print()
+          .output(new DiscardingOutputFormat[(Long, Long)])
           
       val p = env.createProgramPlan()
       

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
index 395f36a..f81cb84 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -40,7 +41,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
       data
         .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
         .reduce( (a,b) => a )
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -73,7 +74,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
       data
         .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
         .reduce( (a, b) => a)
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -107,7 +108,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
         .withPartitioner(new TestPartitionerInt())
         .sortGroup(1, Order.ASCENDING)
         .reduce( (a,b) => a)
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -141,7 +142,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
         .withPartitioner(new TestPartitionerInt())
         .sortGroup(_._2, Order.ASCENDING)
         .reduce( (a,b) => a)
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -175,7 +176,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
         .sortGroup(1, Order.ASCENDING)
         .sortGroup(2, Order.DESCENDING)
         .reduce( (a,b) => a)
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int, Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
index a02d2af..6e40ea5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -41,7 +42,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
       data
           .groupBy("a").withPartitioner(new TestPartitionerInt())
           .reduce( (a,b) => a )
-          .print()
+          .output(new DiscardingOutputFormat[Pojo2])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -72,7 +73,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
       data
           .groupBy("a").withPartitioner(new TestPartitionerInt())
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[Pojo2]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -102,7 +103,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
           .groupBy("a").withPartitioner(new TestPartitionerInt())
           .sortGroup("b", Order.ASCENDING)
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[Pojo3]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -133,7 +134,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
           .sortGroup("b", Order.ASCENDING)
           .sortGroup("c", Order.DESCENDING)
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[Pojo4]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
index 25efe48..b103e9c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.junit.Assert._
 import org.junit.Test
 
@@ -42,7 +43,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
       data.groupBy(0)
           .withPartitioner(new TestPartitionerInt())
           .sum(1)
-          .print()
+          .output(new DiscardingOutputFormat[(Int, Int)])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -73,7 +74,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
       data
           .groupBy(0).withPartitioner(new TestPartitionerInt())
           .reduce( (a,b) => a )
-          .print()
+          .output(new DiscardingOutputFormat[(Int, Int)])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -104,7 +105,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
       data
           .groupBy(0).withPartitioner(new TestPartitionerInt())
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[(Int, Int)]])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -134,7 +135,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
           .groupBy(0).withPartitioner(new TestPartitionerInt())
           .sortGroup(1, Order.ASCENDING)
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[(Int, Int, Int)]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -165,7 +166,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
           .sortGroup(1, Order.ASCENDING)
           .sortGroup(2, Order.DESCENDING)
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[(Int, Int, Int, Int)]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index fe30376..7ebf378 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala._
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Test
@@ -43,7 +44,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       
       data.partitionCustom(part, 0)
           .mapPartition( x => x )
-          .print()
+          .output(new DiscardingOutputFormat[(Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -113,7 +114,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       data
           .partitionCustom(part, "a")
           .mapPartition( x => x)
-          .print()
+          .output(new DiscardingOutputFormat[Pojo])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -184,7 +185,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       data
           .partitionCustom(part, pojo => pojo.a)
           .mapPartition( x => x)
-          .print()
+          .output(new DiscardingOutputFormat[Pojo])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
index 6aa4d75..9a400c5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala.operators.translation
 import org.apache.flink.api.common.functions.{RichCoGroupFunction, RichMapFunction,
 RichJoinFunction}
 import org.apache.flink.api.common.operators.GenericDataSinkBase
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.operators.translation.WrappingFunction
 import org.junit.Assert.assertArrayEquals
 import org.junit.Assert.assertEquals
@@ -67,7 +68,7 @@ class DeltaIterationTranslationTest {
         .setParallelism(ITERATION_PARALLELISM)
         .registerAggregator(AGGREGATOR_NAME, new LongSumAggregator)
 
-      result.print()
+      result.output(new DiscardingOutputFormat[(Double, Long, String)])
       result.writeAsText("/dev/null")
 
       val p: Plan = env.createProgramPlan(JOB_NAME)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
index 7836400..c540f61 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala.operators.translation
 
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.junit.Assert
 import org.junit.Test
 
@@ -31,7 +32,7 @@ class DistinctTranslationTest {
       val input = env.fromElements("1", "2", "1", "3")
 
       val op = input.distinct { x => x}
-      op.print()
+      op.output(new DiscardingOutputFormat[String])
 
       val p = env.createProgramPlan()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
index 2467596..eae3db1 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -46,7 +47,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
           .join(input2, JoinHint.REPARTITION_HASH_FIRST)
           .where(1).equalTo(0)
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[((Long, Long), (Long, Long, Long))])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -110,7 +111,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
           .join(input2, JoinHint.REPARTITION_HASH_FIRST)
           .where("b").equalTo("a")
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Pojo2, Pojo3)])
         
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -174,7 +175,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
           .join(input2, JoinHint.REPARTITION_HASH_FIRST)
           .where( _.a ).equalTo( _.b )
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Pojo2, Pojo3)])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
index e97fc21..5d3878c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala.operators.translation
 
 import org.apache.flink.api.common.operators.{GenericDataSourceBase, GenericDataSinkBase}
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.operators.translation.{KeyExtractingMapper,
 PlanUnwrappingReduceOperator}
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -39,7 +40,8 @@ class ReduceTranslationTest {
       val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
 
 
-      initialData reduce { (v1, v2) => v1 } print()
+      initialData reduce { (v1, v2) => v1 } output(
+        new DiscardingOutputFormat[(Double, String, Long)])
 
       val p = env.createProgramPlan(
 
@@ -70,7 +72,8 @@ class ReduceTranslationTest {
 
       val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
 
-      initialData.groupBy(2) reduce { (v1, v2) => v1 } print()
+      initialData.groupBy(2) reduce { (v1, v2) => v1 } output(
+        new DiscardingOutputFormat[(Double, String, Long)])
 
       val p = env.createProgramPlan()
 
@@ -99,7 +102,8 @@ class ReduceTranslationTest {
 
       val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
 
-      initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) print()
+      initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) output(
+        new DiscardingOutputFormat[(Double, String, Long)])
 
       val p = env.createProgramPlan()
       val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 07300da..e22e0ef 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
 import org.apache.hadoop.fs.Path;
@@ -445,13 +446,32 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		yc.init(yarnConfiguration);
 		yc.start();
 
+		// get temporary folder for writing output of wordcount example
+		File tmpOutFolder = null;
+		try{
+			tmpOutFolder = tmp.newFolder();
+		}
+		catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		// get temporary file for reading input data for wordcount example
+		File tmpInFile = null;
+		try{
+			tmpInFile = tmp.newFile();
+			FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT);
+		}
+		catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+
 		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
 						"-yn", "1",
 						"-yjm", "512",
 						"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
 						"-ytm", "1024",
 						"-ys", "2", // test requesting slots from YARN.
-						"--yarndetached", job},
+						"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
 				"The Job has been submitted with JobID",
 				RunTypes.CLI_FRONTEND);
 
@@ -490,19 +510,26 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			final ApplicationId id = tmpAppId;
 
 			// now it has finished.
-			// check the output.
-			File taskmanagerOut = YarnTestBase.findFile("..", new FilenameFilter() {
-				@Override
-				public boolean accept(File dir, String name) {
-					return name.contains("taskmanager") && name.contains("stdout") && dir.getAbsolutePath().contains(id.toString());
+			// check the output files.
+			File[] listOfOutputFiles = tmpOutFolder.listFiles();
+
+
+			Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
+			LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
+
+			// read all output files in output folder to one output string
+			String content = "";
+			for(File f:listOfOutputFiles)
+			{
+				if(f.isFile())
+				{
+					content += FileUtils.readFileToString(f) + "\n";
 				}
-			});
-			Assert.assertNotNull("Taskmanager output not found", taskmanagerOut);
-			LOG.info("The job has finished. TaskManager output file found {}", taskmanagerOut.getAbsolutePath());
-			String content = FileUtils.readFileToString(taskmanagerOut);
+			}
+			//String content = FileUtils.readFileToString(taskmanagerOut);
 			// check for some of the wordcount outputs.
-			Assert.assertTrue("Expected string '(all,2)' not found in string '"+content+"'", content.contains("(all,2)"));
-			Assert.assertTrue("Expected string '(mind,1)' not found in string'"+content+"'", content.contains("(mind,1)"));
+			Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
+			Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
 
 			// check if the heap size for the TaskManager was set correctly
 			File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {


Mime
View raw message