flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [3/5] flink git commit: [FLINK-2264] [gelly] changed the tests to use collect instead of temp files
Date Thu, 25 Jun 2015 21:25:41 GMT
[FLINK-2264] [gelly] changed the tests to use collect instead of temp files

This closes #863


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/872c3fc4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/872c3fc4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/872c3fc4

Branch: refs/heads/master
Commit: 872c3fc4e191f26e3182d33f7c6e688d406cbeb7
Parents: fe5bbdb
Author: Samia <samia>
Authored: Wed Jun 24 12:41:34 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Thu Jun 25 22:58:17 2015 +0200

----------------------------------------------------------------------
 .../test/GatherSumApplyConfigurationITCase.java |  56 ++---
 .../test/VertexCentricConfigurationITCase.java  | 211 ++++++++++---------
 .../graph/test/operations/CompareResults.java   |  75 +++++++
 .../graph/test/operations/DegreesITCase.java    |  71 ++++---
 .../test/operations/FromCollectionITCase.java   |  44 ++--
 .../test/operations/GraphCreationITCase.java    |  69 +++---
 .../GraphCreationWithMapperITCase.java          |  50 ++---
 .../test/operations/GraphMutationsITCase.java   | 153 +++++++++-----
 .../test/operations/GraphOperationsITCase.java  | 110 ++++++----
 .../test/operations/JoinWithEdgesITCase.java    | 144 +++++++------
 .../test/operations/JoinWithVerticesITCase.java |  61 +++---
 .../graph/test/operations/MapEdgesITCase.java   |  50 ++---
 .../test/operations/MapVerticesITCase.java      |  53 ++---
 .../operations/ReduceOnEdgesMethodsITCase.java  |  96 ++++-----
 .../ReduceOnNeighborMethodsITCase.java          | 115 +++++-----
 15 files changed, 729 insertions(+), 629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 5befafe..4ff14a2 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -31,14 +31,11 @@ import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.test.operations.CompareResults;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -51,22 +48,8 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 		super(mode);
 	}
 
-	private String resultPath;
 	private String expectedResult;
 
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
 	@Test
 	public void testRunWithConfiguration() throws Exception {
 		/*
@@ -86,17 +69,19 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
 		parameters.setOptNumVertices(true);
 
-		Graph<Long, Long, Long> result = graph.runGatherSumApplyIteration(new Gather(), new Sum(),
+		Graph<Long, Long, Long> res = graph.runGatherSumApplyIteration(new Gather(), new Sum(),
 				new Apply(), 10, parameters);
 
-		result.getVertices().writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        DataSet<Vertex<Long, Long>> data = res.getVertices();
+        List<Vertex<Long, Long>> result= data.collect();
 
-		expectedResult = "1	11\n" +
-				"2	11\n" +
-				"3	11\n" +
-				"4	11\n" +
-				"5	11";
+		expectedResult = "1,11\n" +
+				"2,11\n" +
+				"3,11\n" +
+				"4,11\n" +
+				"5,11";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -122,15 +107,16 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 		Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
 		Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
 
-		DataSet<Vertex<Long, Long>> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
-
-		result.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
-		expectedResult = "1	11\n" +
-				"2	12\n" +
-				"3	13\n" +
-				"4	14\n" +
-				"5	15";
+		DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+        List<Vertex<Long, Long>> result= data.collect();
+        
+		expectedResult = "1,11\n" +
+				"2,12\n" +
+				"3,13\n" +
+				"4,14\n" +
+				"5,15";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 567c015..5732adb 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
@@ -35,14 +36,11 @@ import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricConfiguration;
 import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.test.operations.CompareResults;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -54,22 +52,8 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String resultPath;
     private String expectedResult;
 
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
 	@Test
 	public void testRunWithConfiguration() throws Exception {
 		/*
@@ -88,16 +72,19 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
 		parameters.setOptNumVertices(true);
 
-		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+		Graph<Long, Long, Long> res = graph.runVertexCentricIteration(
 				new UpdateFunction(), new MessageFunction(), 10, parameters);
 
-		result.getVertices().writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
-		expectedResult = "1	11\n" +
-						"2	11\n" +
-						"3	11\n" +
-						"4	11\n" +
-						"5	11";
+		DataSet<Vertex<Long,Long>> data = res.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+        
+		expectedResult = "1,11\n" +
+						"2,11\n" +
+						"3,11\n" +
+						"4,11\n" +
+						"5,11";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -123,15 +110,16 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
 		Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
 
-		DataSet<Vertex<Long, Long>> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+		DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+        List<Vertex<Long,Long>> result= data.collect();
+        
+		expectedResult = "1,11\n" +
+						"2,12\n" +
+						"3,13\n" +
+						"4,14\n" +
+						"5,15";
 		
-		result.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
-		expectedResult = "1	11\n" +
-						"2	12\n" +
-						"3	13\n" +
-						"4	14\n" +
-						"5	15";
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -145,16 +133,20 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
 				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
 
-		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+		Graph<Long, Long, Long> res = graph.runVertexCentricIteration(
 				new UpdateFunctionDefault(), new MessageFunctionDefault(), 5);
 
-		result.getVertices().map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
-		expectedResult = "1	6\n" +
-						"2	6\n" +
-						"3	6\n" +
-						"4	6\n" +
-						"5	6";
+		
+		DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long, Long>());
+        List<Tuple2<Long, Long>> result= data.collect();
+        
+		expectedResult = "1,6\n" +
+						"2,6\n" +
+						"3,6\n" +
+						"4,6\n" +
+						"5,6";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -174,14 +166,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerTrg(), 5)
 				.getVertices();
 
-		resultedVertices.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
 
-		expectedResult = "1	[5]\n" +
-				"2	[1]\n" +
-				"3	[1, 2]\n" +
-				"4	[3]\n" +
-				"5	[3, 4]";
+		expectedResult = "1,[5]\n" +
+				"2,[1]\n" +
+				"3,[1, 2]\n" +
+				"4,[3]\n" +
+				"5,[3, 4]";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -206,14 +199,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerSrc(), 5, parameters)
 				.getVertices();
 
-		resultedVertices.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
 
-		expectedResult = "1	[2, 3]\n" +
-				"2	[3]\n" +
-				"3	[4, 5]\n" +
-				"4	[5]\n" +
-				"5	[1]";
+		expectedResult = "1,[2, 3]\n" +
+				"2,[3]\n" +
+				"3,[4, 5]\n" +
+				"4,[5]\n" +
+				"5,[1]";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -238,14 +232,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters)
 				.getVertices();
 
-		resultedVertices.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
 
-		expectedResult = "1	[2, 3, 5]\n" +
-				"2	[1, 3]\n" +
-				"3	[1, 2, 4, 5]\n" +
-				"4	[3, 5]\n" +
-				"5	[1, 3, 4]";
+		expectedResult = "1,[2, 3, 5]\n" +
+				"2,[1, 3]\n" +
+				"3,[1, 2, 4, 5]\n" +
+				"4,[3, 5]\n" +
+				"5,[1, 3, 4]";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -262,14 +257,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
 				new DummyMessageFunction(), 2).getVertices();
 
-		verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, Long>> result= verticesWithNumVertices.collect();
 
-		expectedResult = "1	-1\n" +
-				"2	-1\n" +
-				"3	-1\n" +
-				"4	-1\n" +
-				"5	-1";
+		expectedResult = "1,-1\n" +
+				"2,-1\n" +
+				"3,-1\n" +
+				"4,-1\n" +
+				"5,-1";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -293,14 +289,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
 				new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
 
-		verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
-		expectedResult = "1	1\n" +
-				"2	1\n" +
-				"3	2\n" +
-				"4	1\n" +
-				"5	2";
+		expectedResult = "1,1\n" +
+				"2,1\n" +
+				"3,2\n" +
+				"4,1\n" +
+				"5,2";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -317,14 +314,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
 				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
 
-		verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
-		expectedResult = "1	-1\n" +
-				"2	-1\n" +
-				"3	-1\n" +
-				"4	-1\n" +
-				"5	-1";
+		expectedResult = "1,-1\n" +
+				"2,-1\n" +
+				"3,-1\n" +
+				"4,-1\n" +
+				"5,-1";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -348,14 +346,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
 				new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
 
-		verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
-		expectedResult = "1	2\n" +
-				"2	1\n" +
-				"3	2\n" +
-				"4	1\n" +
-				"5	1";
+		expectedResult = "1,2\n" +
+				"2,1\n" +
+				"3,2\n" +
+				"4,1\n" +
+				"5,1";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -372,14 +371,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
 				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
 
-		verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
-		expectedResult = "1	-1\n" +
-				"2	-1\n" +
-				"3	-1\n" +
-				"4	-1\n" +
-				"5	-1";
+		expectedResult = "1,-1\n" +
+				"2,-1\n" +
+				"3,-1\n" +
+				"4,-1\n" +
+				"5,-1";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -403,14 +403,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = graph.runVertexCentricIteration(
 				new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices();
 
-		verticesWithNumNeighbors.writeAsCsv(resultPath, "\n", "\t");
-		env.execute();
+        List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect();
 
-		expectedResult = "1	true\n" +
-				"2	true\n" +
-				"3	true\n" +
-				"4	true\n" +
-				"5	true";
+		expectedResult = "1,true\n" +
+				"2,true\n" +
+				"3,true\n" +
+				"4,true\n" +
+				"5,true";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java
new file mode 100644
index 0000000..2372f55
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.graph.test.operations;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class CompareResults {
+	
+	public static <T> void compareResultAsTuples(List<T> result, String expected) {
+		compareResult(result, expected, true);
+	}
+
+	public static <T> void compareResultAsText(List<T> result, String expected) {
+		compareResult(result, expected, false);
+	}
+	
+	private static <T> void compareResult(List<T> result, String expected, boolean asTuples) {
+		String[] extectedStrings = expected.split("\n");
+		String[] resultStrings = new String[result.size()];
+		
+		for (int i = 0; i < resultStrings.length; i++) {
+			T val = result.get(i);
+			
+			if (asTuples) {
+				if (val instanceof Tuple) {
+					Tuple t = (Tuple) val;
+					Object first = t.getField(0);
+					StringBuilder bld = new StringBuilder(first == null ? "null" : first.toString());
+					for (int pos = 1; pos < t.getArity(); pos++) {
+						Object next = t.getField(pos);
+						bld.append(',').append(next == null ? "null" : next.toString());
+					}
+					resultStrings[i] = bld.toString();
+				}
+				else {
+					throw new IllegalArgumentException(val + " is no tuple");
+				}
+			}
+			else {
+				resultStrings[i] = (val == null) ? "null" : val.toString();
+			}
+		}
+		
+		assertEquals("Wrong number of elements result", extectedStrings.length, resultStrings.length);
+
+		Arrays.sort(extectedStrings);
+		Arrays.sort(resultStrings);
+		
+		for (int i = 0; i < extectedStrings.length; i++) {
+			assertEquals(extectedStrings[i], resultStrings[i]);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
index 0391dcb..50f97da 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
@@ -18,16 +18,15 @@
 
 package org.apache.flink.graph.test.operations;
 
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -38,21 +37,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String resultPath;
     private String expectedResult;
 
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
 
 	@Test
 	public void testOutDegrees() throws Exception {
@@ -64,14 +50,18 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
-
+        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+       
+        
         expectedResult = "1,2\n" +
                     "2,1\n" +
                     "3,2\n" +
                     "4,1\n" +
                     "5,1\n";
+        
+        CompareResults.compareResultAsTuples(result, expectedResult);
+        
     }
 
 	@Test
@@ -84,14 +74,18 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
 
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
-
+        
+        
+        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+        
         expectedResult = "1,3\n" +
                 "2,1\n" +
                 "3,1\n" +
                 "4,1\n" +
                 "5,0\n";
+        
+        CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -104,13 +98,16 @@ public class DegreesITCase extends MultipleProgramsTestBase {
 	    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 	            TestGraphUtils.getLongLongEdgeData(env), env);
 
-	    graph.inDegrees().writeAsCsv(resultPath);
-	    env.execute();
+
+        DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+	    
 	    expectedResult = "1,1\n" +
 		            "2,1\n" +
 		            "3,2\n" +
 		            "4,1\n" +
 		            "5,2\n";
+	    CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -123,13 +120,16 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
 
-        graph.inDegrees().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+        
         expectedResult = "1,0\n" +
 	                "2,1\n" +
 	                "3,1\n" +
 	                "4,1\n" +
 	                "5,3\n";
+        
+        CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -142,13 +142,16 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        graph.getDegrees().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Tuple2<Long,Long>> data =graph.getDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+        
         expectedResult = "1,3\n" +
 	                "2,2\n" +
 	                "3,4\n" +
 	                "4,2\n" +
 	                "5,3\n";
+        
+        CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -161,12 +164,18 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, NullValue, Long> graph =
                 Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
 
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+        
         expectedResult = "1,2\n" +
                 "2,1\n" +
                 "3,0\n" +
                 "4,1\n" +
                 "5,0\n";
+        
+        CompareResults.compareResultAsTuples(result, expectedResult);
     }
+	
+	
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
index 5d4b7d7..87a66ba 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
@@ -18,17 +18,18 @@
 
 package org.apache.flink.graph.test.operations;
 
+
+import java.util.List;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -39,21 +40,8 @@ public class FromCollectionITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String resultPath;
     private String expectedResult;
 
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
 
 	@Test
 	public void testFromCollectionVerticesEdges() throws Exception {
@@ -64,8 +52,9 @@ public class FromCollectionITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
                 TestGraphUtils.getLongLongEdges(), env);
 
-        graph.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+        
         expectedResult = "1,2,12\n" +
 	                "1,3,13\n" +
 	                "2,3,23\n" +
@@ -73,6 +62,8 @@ public class FromCollectionITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+        CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -84,13 +75,17 @@ public class FromCollectionITCase extends MultipleProgramsTestBase {
         Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
         		env);
 
-        graph.getVertices().writeAsCsv(resultPath);
-        env.execute();
+        
+        DataSet<Vertex<Long,NullValue>> data = graph.getVertices();
+        List<Vertex<Long,NullValue>> result= data.collect();
+        
         expectedResult = "1,(null)\n" +
 	                "2,(null)\n" +
 	                "3,(null)\n" +
 	                "4,(null)\n" +
 	                "5,(null)\n";
+        
+        CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -103,13 +98,16 @@ public class FromCollectionITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
                 new InitVerticesMapper(), env);
 
-        graph.getVertices().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+        
         expectedResult = "1,2\n" +
 	                "2,4\n" +
 	                "3,6\n" +
 	                "4,8\n" +
 	                "5,10\n";
+        
+        CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
index 3fe69fd..abdde0d 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.graph.test.operations;
 
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,11 +32,7 @@ import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
 import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -44,21 +43,8 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
 
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
+    private String expectedResult;
 
 	@Test
 	public void testCreateWithoutVertexValues() throws Exception {
@@ -68,13 +54,16 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Vertex<Long,NullValue>> data = graph.getVertices();
+        List<Vertex<Long,NullValue>> result= data.collect();
+        
 		expectedResult = "1,(null)\n" +
 					"2,(null)\n" +
 					"3,(null)\n" +
 					"4,(null)\n" +
 					"5,(null)\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -86,13 +75,16 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
 				new AssignIdAsValueMapper(), env);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+        
 		expectedResult = "1,1\n" +
 					"2,2\n" +
 					"3,3\n" +
 					"4,4\n" +
 					"5,5\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -104,13 +96,16 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet(
 				TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Vertex<Long,DummyCustomParameterizedType<Double>>> data = graph.getVertices();
+        List<Vertex<Long,DummyCustomParameterizedType<Double>>> result= data.collect();
+        
 		expectedResult = "1,(2.0,0)\n" +
 				"2,(4.0,1)\n" +
 				"3,(6.0,2)\n" +
 				"4,(8.0,3)\n" +
 				"5,(10.0,4)\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -123,12 +118,16 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
-		Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
-
-		env.fromElements(result).writeAsText(resultPath);
-		env.execute();
-
-		expectedResult = "true\n";
+		Boolean valid = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+
+		//env.fromElements(result).writeAsText(resultPath);
+		
+		String res= valid.toString();//env.fromElements(valid);
+        List<String> result= new LinkedList<String>();
+        result.add(res);
+		expectedResult = "true";
+		
+		CompareResults.compareResultAsText(result, expectedResult);
 	}
 
 	@Test
@@ -141,11 +140,15 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
-		Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
-		env.fromElements(result).writeAsText(resultPath);
-		env.execute();
+		Boolean valid = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+		
+		String res= valid.toString();//env.fromElements(valid);
+        List<String> result= new LinkedList<String>();
+        result.add(res);
 
 		expectedResult = "false\n";
+		
+		CompareResults.compareResultAsText(result, expectedResult);
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
index 23d05ac..523cee5 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.graph.test.operations;
 
+import java.util.List;
+
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -40,21 +40,8 @@ public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String resultPath;
     private String expectedResult;
 
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
 
 	@Test
 	public void testWithDoubleValueMapper() throws Exception {
@@ -65,13 +52,16 @@ public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
 		Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
 				new AssignDoubleValueMapper(), env);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Vertex<Long,Double>> data = graph.getVertices();
+        List<Vertex<Long,Double>> result= data.collect();
+		
 		expectedResult = "1,0.1\n" +
 				"2,0.1\n" +
 				"3,0.1\n" +
 				"4,0.1\n" +
 				"5,0.1\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -83,13 +73,16 @@ public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
 		Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
 				TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Vertex<Long, Tuple2<Long, Long>>> data = graph.getVertices();
+        List<Vertex<Long, Tuple2<Long, Long>>> result= data.collect();
+        
 		expectedResult = "1,(2,42)\n" +
 				"2,(4,42)\n" +
 				"3,(6,42)\n" +
 				"4,(8,42)\n" +
 				"5,(10,42)\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -102,13 +95,17 @@ public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
 	Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
 			new AssignDoubleConstantMapper(), env);
 
-	graph.getVertices().writeAsCsv(resultPath);
-	env.execute();
+    DataSet<Vertex<String,Double>> data = graph.getVertices();
+    List<Vertex<String,Double>> result= data.collect();
+    
 	expectedResult = "1,0.1\n" +
 			"2,0.1\n" +
 			"3,0.1\n" +
 			"4,0.1\n" +
 			"5,0.1\n";
+	
+	CompareResults.compareResultAsTuples(result, expectedResult);
+	
 	}
 
 	@Test
@@ -120,13 +117,16 @@ public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
 		Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
 				TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+	    DataSet<Vertex<Long,DummyCustomType>> data = graph.getVertices();
+	    List<Vertex<Long,DummyCustomType>> result= data.collect();
+	    
 		expectedResult = "1,(F,0)\n" +
 				"2,(F,1)\n" +
 				"3,(F,2)\n" +
 				"4,(F,3)\n" +
 				"5,(F,4)\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
index 0d71b97..c6a8012 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
@@ -21,17 +21,14 @@ package org.apache.flink.graph.test.operations;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -42,21 +39,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String resultPath;
     private String expectedResult;
 
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
 
 	@Test
 	public void testAddVertex() throws Exception {
@@ -70,8 +54,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L));
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+        
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
@@ -79,6 +64,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"4,4\n" +
 				"5,5\n" +
 				"6,6\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -98,8 +85,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		graph = graph.addVertices(vertices);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
@@ -108,6 +95,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"5,5\n" +
 				"6,6\n" +
 				"7,7\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -121,14 +110,17 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L));
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+		
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
 				"3,3\n" +
 				"4,4\n" +
 				"5,5\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -148,14 +140,16 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		graph = graph.addVertices(vertices);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
 				"3,3\n" +
 				"4,4\n" +
 				"5,5\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -175,8 +169,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		graph = graph.addVertices(vertices);
 
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
@@ -184,6 +178,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"4,4\n" +
 				"5,5\n" +
 				"6,6\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -197,13 +193,16 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 		graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
 				"2,3,23\n" +
 				"3,4,34\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -222,12 +221,15 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		verticesToBeRemoved.add(new Vertex<Long, Long>(2L, 2L));
 
 		graph = graph.removeVertices(verticesToBeRemoved);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -241,8 +243,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 		graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -251,6 +254,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -268,13 +273,16 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
 
 		graph = graph.removeVertices(verticesToBeRemoved);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "2,3,23\n" +
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -292,8 +300,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
 
 		graph = graph.removeVertices(verticesToBeRemoved);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -302,6 +311,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -319,14 +330,17 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
 
 		graph = graph.removeVertices(verticesToBeRemoved);
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long, Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
 				"3,3\n" +
 				"4,4\n" +
 				"5,5\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 	
 	@Test
@@ -341,8 +355,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 		graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
 				61L);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -352,6 +367,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"4,5,45\n" +
 				"5,1,51\n" +
 				"6,1,61\n";	
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -371,8 +388,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		graph = graph.addEdges(edgesToBeAdded);
 
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -383,6 +400,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"4,1,41\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -402,8 +421,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		graph = graph.addEdges(edgesToBeAdded);
 
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -412,6 +431,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -426,8 +447,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 		graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
 				12L);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,2,12\n" +
@@ -437,6 +459,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";	
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -450,8 +474,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 		graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -459,6 +484,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -476,14 +503,17 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		edgesToBeRemoved.add(new Edge<Long, Long>(2L, 3L, 23L));
 
 		graph = graph.removeEdges(edgesToBeRemoved);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -501,8 +531,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
 
 		graph = graph.removeEdges(edgesToBeRemoved);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -510,6 +541,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -523,8 +556,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 		graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -533,6 +567,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -550,8 +586,9 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		edgesToBeRemoved.add(new Edge<Long, Long>(6L, 1L, 61L));
 
 		graph = graph.removeEdges(edgesToBeRemoved);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -560,5 +597,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index 1b9d5ac..904b58a 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -22,17 +22,16 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -43,22 +42,8 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String resultPath;
     private String expectedResult;
 
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
 	@Test
 	public void testUndirected() throws Exception {
 		/*
@@ -69,8 +54,9 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		graph.getUndirected().getEdges().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Edge<Long,Long>> data = graph.getUndirected().getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+        
 		expectedResult = "1,2,12\n" + "2,1,12\n" +
 					"1,3,13\n" + "3,1,13\n" +
 					"2,3,23\n" + "3,2,23\n" +
@@ -78,6 +64,8 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 					"3,5,35\n" + "5,3,35\n" +
 					"4,5,45\n" + "5,4,45\n" +
 					"5,1,51\n" + "1,5,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -90,8 +78,9 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		graph.reverse().getEdges().writeAsCsv(resultPath);
-		env.execute();
+        DataSet<Edge<Long,Long>> data = graph.reverse().getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+        
 		expectedResult = "2,1,12\n" +
 					"3,1,13\n" +
 					"3,2,23\n" +
@@ -99,6 +88,8 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 					"5,3,35\n" +
 					"5,4,45\n" +
 					"1,5,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@SuppressWarnings("serial")
@@ -111,7 +102,8 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
+		
+		DataSet<Edge<Long,Long>> data= graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
 						   public boolean filter(Vertex<Long, Long> vertex) throws Exception {
 							   return (vertex.getValue() > 2);
 						   }
@@ -120,11 +112,14 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 					public boolean filter(Edge<Long, Long> edge) throws Exception {
 						return (edge.getValue() > 34);
 					}
-				}).getEdges().writeAsCsv(resultPath);
+				}).getEdges();
 
-		env.execute();
+        List<Edge<Long, Long>> result= data.collect();
+        
 		expectedResult = "3,5,35\n" +
 					"4,5,45\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@SuppressWarnings("serial")
@@ -137,16 +132,20 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
+		
+		DataSet<Edge<Long,Long>> data = graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
 			public boolean filter(Vertex<Long, Long> vertex) throws Exception {
 				return (vertex.getValue() > 2);
 			}
-		}).getEdges().writeAsCsv(resultPath);
+		}).getEdges();
 
-		env.execute();
+        List<Edge<Long, Long>> result= data.collect();
+		
 		expectedResult =  "3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@SuppressWarnings("serial")
@@ -159,16 +158,20 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
+		
+		DataSet<Edge<Long,Long>> data = graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
 			public boolean filter(Edge<Long, Long> edge) throws Exception {
 				return (edge.getValue() > 34);
 			}
-		}).getEdges().writeAsCsv(resultPath);
+		}).getEdges();
 
-		env.execute();
+        List<Edge<Long, Long>> result= data.collect();
+        
 		expectedResult = "3,5,35\n" +
 					"4,5,45\n" +
 					"5,1,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -180,10 +183,13 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		env.fromElements(graph.numberOfVertices()).writeAsText(resultPath);
+		DataSet<Long> data = env.fromElements(graph.numberOfVertices());
 
-		env.execute();
+        List<Long> result= data.collect();
+        
 		expectedResult = "5";
+		
+		CompareResults.compareResultAsText(result, expectedResult);
 	}
 
 	@Test
@@ -195,10 +201,13 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		env.fromElements(graph.numberOfEdges()).writeAsText(resultPath);
+		DataSet<Long> data = env.fromElements(graph.numberOfEdges());
 
-		env.execute();
+        List<Long> result= data.collect();
+        
 		expectedResult = "7";
+		
+		CompareResults.compareResultAsText(result, expectedResult);
 	}
 
 	@Test
@@ -210,10 +219,13 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.getVertexIds().writeAsText(resultPath);
-
-		env.execute();
+		
+		DataSet<Long> data = graph.getVertexIds();
+        List<Long> result= data.collect();
+        
 		expectedResult = "1\n2\n3\n4\n5\n";
+		
+		CompareResults.compareResultAsText(result, expectedResult);
 	}
 
 	@Test
@@ -225,13 +237,16 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.getEdgeIds().writeAsCsv(resultPath);
-
-		env.execute();
+		
+		DataSet<Tuple2<Long,Long>> data = graph.getEdgeIds();
+        List<Tuple2<Long, Long>> result= data.collect();
+        
 		expectedResult = "1,2\n" + "1,3\n" +
 				"2,3\n" + "3,4\n" +
 				"3,5\n" + "4,5\n" +
 				"5,1\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -252,9 +267,8 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		graph = graph.union(Graph.fromCollection(vertices, edges, env));
 
-		graph.getEdges().writeAsCsv(resultPath);
-
-		env.execute();
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 					"1,3,13\n" +
@@ -264,6 +278,8 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 					"4,5,45\n" +
 					"5,1,51\n" +
 					"6,1,61\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 
 	@Test
@@ -276,12 +292,14 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		graph.getTriplets().writeAsCsv(resultPath);
+        DataSet<Triplet<Long,Long,Long>> data = graph.getTriplets();
+        List<Triplet<Long,Long,Long>> result= data.collect();
 
-		env.execute();
 		expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" +
 				"2,3,2,3,23\n" + "3,4,3,4,34\n" +
 				"3,5,3,5,35\n" + "4,5,4,5,45\n" +
 				"5,1,5,1,51\n";
+		
+		CompareResults.compareResultAsTuples(result, expectedResult);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
index c02f0bb..afdfbf2 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.graph.test.operations;
 
+import java.util.List;
+
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -28,11 +31,7 @@ import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -43,22 +42,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String resultPath;
     private String expectedResult;
 
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
 	@Test
 	public void testWithEdgesInputDataset() throws Exception {
 		/*
@@ -70,11 +55,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
+        Graph<Long, Long, Long> res = graph.joinWithEdges(graph.getEdges()
                         .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,26\n" +
@@ -83,6 +68,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,70\n" +
 	                "4,5,90\n" +
 	                "5,1,102\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -96,11 +83,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
+        Graph<Long, Long, Long> res = graph.joinWithEdges(graph.getEdges().first(3)
                         .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,26\n" +
@@ -109,6 +96,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -122,11 +111,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
+        Graph<Long, Long, Long> res = graph.joinWithEdges(graph.getEdges().first(3)
                         .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,26\n" +
@@ -135,6 +124,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -148,11 +139,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
+        Graph<Long, Long, Long> res = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
                 new DoubleValueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,26\n" +
@@ -161,6 +152,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -173,11 +166,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
+        Graph<Long, Long, Long> res = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
                 new CustomValueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,10\n" +
 	                "1,3,20\n" +
@@ -186,6 +179,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -199,11 +194,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges()
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(graph.getEdges()
                         .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,25\n" +
@@ -212,6 +207,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,69\n" +
 	                "4,5,90\n" +
 	                "5,1,102\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -225,11 +222,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
                         .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,25\n" +
@@ -238,6 +235,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -251,11 +250,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
                         .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,26\n" +
@@ -264,6 +263,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -277,11 +278,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
                 new DoubleValueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,20\n" +
 	                "1,3,20\n" +
@@ -290,6 +291,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,80\n" +
 	                "4,5,120\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -302,11 +305,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
                 new CustomValueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,10\n" +
 	                "1,3,10\n" +
@@ -315,6 +318,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,40\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -328,11 +333,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges()
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(graph.getEdges()
                         .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,26\n" +
@@ -341,6 +346,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,70\n" +
 	                "4,5,80\n" +
 	                "5,1,102\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -354,14 +361,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
                         .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,26\n" +
@@ -370,6 +374,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -383,11 +389,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
                         .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,24\n" +
 	                "1,3,26\n" +
@@ -396,6 +402,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -409,11 +417,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
                 new DoubleValueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,20\n" +
 	                "1,3,40\n" +
@@ -422,6 +430,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,140\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@Test
@@ -434,11 +444,11 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
+        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
                 new CustomValueMapper());
 
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
+        DataSet<Edge<Long,Long>> data = res.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
 
         expectedResult = "1,2,10\n" +
 	                "1,3,20\n" +
@@ -447,6 +457,8 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	                "3,5,35\n" +
 	                "4,5,45\n" +
 	                "5,1,51\n";
+        
+		CompareResults.compareResultAsTuples(result, expectedResult);
     }
 
 	@SuppressWarnings("serial")


Mime
View raw message