flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [6/7] flink git commit: [gelly] refactored tests; removed duplicate data from TestGraphUtils
Date Wed, 04 Mar 2015 20:33:08 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
deleted file mode 100644
index 4e2c858..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class MapVerticesITCase extends MultipleProgramsTestBase {
-
-	public MapVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testWithSameValue() throws Exception {
-		/*
-		 * Test mapVertices() keeping the same value type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices();
-		
-		mappedVertices.writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,2\n" +
-			"2,3\n" +
-			"3,4\n" +
-			"4,5\n" +
-			"5,6\n";
-	}
-
-	@Test
-	public void testWithStringValue() throws Exception {
-		/*
-		 * Test mapVertices() and change the value type to String
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices();
-		
-		mappedVertices.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,one\n" +
-			"2,two\n" +
-			"3,three\n" +
-			"4,four\n" +
-			"5,five\n";
-	}
-
-	@Test
-	public void testWithtuple1Value() throws Exception {
-		/*
-		 * Test mapVertices() and change the value type to a Tuple1
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
-		
-		mappedVertices.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,(1)\n" +
-			"2,(2)\n" +
-			"3,(3)\n" +
-			"4,(4)\n" +
-			"5,(5)\n";
-	}
-
-	@Test
-	public void testWithCustomType() throws Exception {
-		/*
-		 * Test mapVertices() and change the value type to a custom type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices();
-		
-		mappedVertices.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,(T,1)\n" +
-			"2,(T,2)\n" +
-			"3,(T,3)\n" +
-			"4,(T,4)\n" +
-			"5,(T,5)\n";
-	}
-
-	@Test
-	public void testWithCustomParametrizedType() throws Exception {
-		/*
-		 * Test mapVertices() and change the value type to a parameterized custom type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
-				new ToCustomParametrizedTypeMapper()).getVertices();
-		
-		mappedVertices.writeAsCsv(resultPath);
-		env.execute();
-	
-		expectedResult = "1,(1.0,1)\n" +
-			"2,(2.0,2)\n" +
-			"3,(3.0,3)\n" +
-			"4,(4.0,4)\n" +
-			"5,(5.0,5)\n";
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
-		public Long map(Vertex<Long, Long> value) throws Exception {
-			return value.getValue()+1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> {
-		public String map(Vertex<Long, Long> vertex) throws Exception {
-			String stringValue;
-			if (vertex.getValue() == 1) {
-				stringValue = "one";
-			}
-			else if (vertex.getValue() == 2) {
-				stringValue = "two";
-			}
-			else if (vertex.getValue() == 3) {
-				stringValue = "three";
-			}
-			else if (vertex.getValue() == 4) {
-				stringValue = "four";
-			}
-			else if (vertex.getValue() == 5) {
-				stringValue = "five";
-			}
-			else {
-				stringValue = "";
-			}
-			return stringValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
-		public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
-			Tuple1<Long> tupleValue = new Tuple1<Long>();
-			tupleValue.setFields(vertex.getValue());
-			return tupleValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> {
-		public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
-			DummyCustomType dummyValue = new DummyCustomType();
-			dummyValue.setIntField(vertex.getValue().intValue());						
-			return dummyValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>, 
-		DummyCustomParameterizedType<Double>> {
-		
-		public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
-			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
-			dummyValue.setIntField(vertex.getValue().intValue());
-			dummyValue.setTField(new Double(vertex.getValue()));						
-			return dummyValue;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
deleted file mode 100644
index 29d76f0..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.EdgesFunction;
-import org.apache.flink.graph.EdgesFunctionWithVertexValue;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
-
-	public ReduceOnEdgesMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testLowestWeightOutNeighbor() throws Exception {
-		/*
-		 * Get the lowest-weight out-neighbor
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
-		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-		env.execute();
-	
-		expectedResult = "1,2\n" +
-				"2,3\n" + 
-				"3,4\n" +
-				"4,5\n" + 
-				"5,1\n";
-	}
-
-	@Test
-	public void testLowestWeightInNeighbor() throws Exception {
-		/*
-		 * Get the lowest-weight in-neighbor
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
-		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,5\n" +
-					"2,1\n" + 
-					"3,1\n" +
-					"4,3\n" + 
-					"5,3\n";
-	}
-
-	@Test
-	public void testMaxWeightEdge() throws Exception {
-		/*
-		 * Get the maximum weight among all edges
-		 * of a vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-				graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
-		verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,51\n" +
-				"2,23\n" + 
-				"3,35\n" +
-				"4,45\n" + 
-				"5,51\n";
-	}
-
-	@Test
-	public void testLowestWeightOutNeighborNoValue() throws Exception {
-		/*
-		 * Get the lowest-weight out-neighbor
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
-		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2\n" +
-				"2,3\n" + 
-				"3,4\n" +
-				"4,5\n" + 
-				"5,1\n";
-	}
-
-	@Test
-	public void testLowestWeightInNeighborNoValue() throws Exception {
-		/*
-		 * Get the lowest-weight in-neighbor
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
-		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,5\n" +
-				"2,1\n" + 
-				"3,1\n" +
-				"4,3\n" + 
-				"5,3\n";
-	}
-
-	@Test
-	public void testMaxWeightAllNeighbors() throws Exception {
-		/*
-		 * Get the maximum weight among all edges
-		 * of a vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-				graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
-		verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,51\n" +
-				"2,23\n" + 
-				"3,35\n" +
-				"4,45\n" + 
-				"5,51\n";
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateEdges(
-				Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges) {
-			
-			long weight = Long.MAX_VALUE;
-			long minNeighorId = 0;
-			
-			for (Edge<Long, Long> edge: edges) {
-				if (edge.getValue() < weight) {
-					weight = edge.getValue();
-					minNeighorId = edge.getTarget();
-				}
-			}
-			return new Tuple2<Long, Long>(v.getId(), minNeighorId);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges) {
-			
-			long weight = Long.MIN_VALUE;
-
-			for (Edge<Long, Long> edge: edges) {
-				if (edge.getValue() > weight) {
-					weight = edge.getValue();
-				}
-			}
-			return new Tuple2<Long, Long>(v.getId(), weight);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-
-			long weight = Long.MAX_VALUE;
-			long minNeighorId = 0;
-			long vertexId = -1;
-			long i=0;
-
-			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-				if (edge.f1.getValue() < weight) {
-					weight = edge.f1.getValue();
-					minNeighorId = edge.f1.getTarget();
-				}
-				if (i==0) {
-					vertexId = edge.f0;
-				} i++;
-			}
-			return new Tuple2<Long, Long>(vertexId, minNeighorId);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-			
-			long weight = Long.MIN_VALUE;
-			long vertexId = -1;
-			long i=0;
-
-			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-				if (edge.f1.getValue() > weight) {
-					weight = edge.f1.getValue();
-				}
-				if (i==0) {
-					vertexId = edge.f0;
-				} i++;
-			}
-			return new Tuple2<Long, Long>(vertexId, weight);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateEdges(
-				Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges) {
-			
-			long weight = Long.MAX_VALUE;
-			long minNeighorId = 0;
-			
-			for (Edge<Long, Long> edge: edges) {
-				if (edge.getValue() < weight) {
-					weight = edge.getValue();
-					minNeighorId = edge.getSource();
-				}
-			}
-			return new Tuple2<Long, Long>(v.getId(), minNeighorId);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-			
-			long weight = Long.MAX_VALUE;
-			long minNeighorId = 0;
-			long vertexId = -1;
-			long i=0;
-
-			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-				if (edge.f1.getValue() < weight) {
-					weight = edge.f1.getValue();
-					minNeighorId = edge.f1.getSource();
-				}
-				if (i==0) {
-					vertexId = edge.f0;
-				} i++;
-			}
-			return new Tuple2<Long, Long>(vertexId, minNeighorId);
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
deleted file mode 100644
index d385399..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.NeighborsFunction;
-import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
-
-	public ReduceOnNeighborMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testSumOfOutNeighbors() throws Exception {
-		/*
-		 * Get the sum of out-neighbor values
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
-
-		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,5\n" +
-				"2,3\n" + 
-				"3,9\n" +
-				"4,5\n" + 
-				"5,1\n";
-	}
-
-	@Test
-	public void testSumOfInNeighbors() throws Exception {
-		/*
-		 * Get the sum of in-neighbor values
-		 * times the edge weights for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSum = 
-				graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);		
-
-		verticesWithSum.writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,255\n" +
-				"2,12\n" + 
-				"3,59\n" +
-				"4,102\n" + 
-				"5,285\n";
-	}
-
-	@Test
-	public void testSumOfOAllNeighbors() throws Exception {
-		/*
-		 * Get the sum of all neighbor values
-		 * including own vertex value
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
-
-		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,11\n" +
-				"2,6\n" + 
-				"3,15\n" +
-				"4,12\n" + 
-				"5,13\n";
-	}
-
-	@Test
-	public void testSumOfOutNeighborsNoValue() throws Exception {
-		/*
-		 * Get the sum of out-neighbor values
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT);
-
-		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,5\n" +
-				"2,3\n" + 
-				"3,9\n" +
-				"4,5\n" + 
-				"5,1\n";
-	}
-
-	@Test
-	public void testSumOfInNeighborsNoValue() throws Exception {
-		/*
-		 * Get the sum of in-neighbor values
-		 * times the edge weights for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSum = 
-				graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
-
-		verticesWithSum.writeAsCsv(resultPath);
-		env.execute();
-	
-		expectedResult = "1,255\n" +
-				"2,12\n" + 
-				"3,59\n" +
-				"4,102\n" + 
-				"5,285\n";
-	}
-
-	@Test
-	public void testSumOfAllNeighborsNoValue() throws Exception {
-		/*
-		 * Get the sum of all neighbor values
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL);
-
-		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-		env.execute();
-	
-		expectedResult = "1,10\n" +
-				"2,4\n" + 
-				"3,12\n" +
-				"4,8\n" + 
-				"5,8\n";
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
-	Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-			
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f1.getValue();
-			}
-			return new Tuple2<Long, Long>(vertex.getId(), sum);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
-		Tuple2<Long, Long>> {
-		
-		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-		
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
-			}
-			return new Tuple2<Long, Long>(vertex.getId(), sum);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
-		Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-		Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-	
-			long sum = 0;
-			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-				sum += neighbor.f1.getValue();
-			}
-			return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue());
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
-		Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateNeighbors(
-				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
-			long sum = 0;
-			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
-				sum += next.f2.getValue();
-			}
-			return new Tuple2<Long, Long>(next.f0, sum);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
-		Tuple2<Long, Long>> {
-		
-		public Tuple2<Long, Long> iterateNeighbors(
-				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-		
-			long sum = 0;
-			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
-				sum += next.f2.getValue() * next.f1.getValue();
-			}
-			return new Tuple2<Long, Long>(next.f0, sum);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
-		Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateNeighbors(
-				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-	
-			long sum = 0;
-			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
-				sum += next.f2.getValue();
-			}
-			return new Tuple2<Long, Long>(next.f0, sum);
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
index 9f6569b..75355f0 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -33,27 +33,47 @@ public class TestGraphUtils {
 
 	public static final DataSet<Vertex<Long, Long>> getLongLongVertexData(
 			ExecutionEnvironment env) {
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
-		vertices.add(new Vertex<Long, Long>(1L, 1L));
-		vertices.add(new Vertex<Long, Long>(2L, 2L));
-		vertices.add(new Vertex<Long, Long>(3L, 3L));
-		vertices.add(new Vertex<Long, Long>(4L, 4L));
-		vertices.add(new Vertex<Long, Long>(5L, 5L));
-		
-		return env.fromCollection(vertices);
+
+		return env.fromCollection(getLongLongVertices());
 	}
 	
 	public static final DataSet<Edge<Long, Long>> getLongLongEdgeData(
 			ExecutionEnvironment env) {
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-		edges.add(new Edge<Long, Long>(1L, 2L, 12L));
-		edges.add(new Edge<Long, Long>(1L, 3L, 13L));
-		edges.add(new Edge<Long, Long>(2L, 3L, 23L));
-		edges.add(new Edge<Long, Long>(3L, 4L, 34L));
-		edges.add(new Edge<Long, Long>(3L, 5L, 35L));
-		edges.add(new Edge<Long, Long>(4L, 5L, 45L));
-		edges.add(new Edge<Long, Long>(5L, 1L, 51L));
-		
+
+		return env.fromCollection(getLongLongEdges());
+	}
+
+	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Long>> edges = getLongLongEdges();
+
+		edges.remove(1);
+		edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Long>> edges =  getLongLongEdges();
+
+		edges.remove(0);
+		edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Long>> edges =  getLongLongEdges();
+
+		edges.remove(0);
+		edges.remove(1);
+		edges.remove(2);
+		edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+		edges.add(new Edge<Long, Long>(1L, 12L, 12L));
+		edges.add(new Edge<Long, Long>(13L, 33L, 13L));
+
 		return env.fromCollection(edges);
 	}
 	
@@ -193,12 +213,10 @@ public class TestGraphUtils {
 	 */
 	public static final DataSet<Vertex<Long, Long>> getLongLongInvalidVertexData(
 			ExecutionEnvironment env) {
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		List<Vertex<Long, Long>> vertices = getLongLongVertices();
+
+		vertices.remove(0);
 		vertices.add(new Vertex<Long, Long>(15L, 1L));
-		vertices.add(new Vertex<Long, Long>(2L, 2L));
-		vertices.add(new Vertex<Long, Long>(3L, 3L));
-		vertices.add(new Vertex<Long, Long>(4L, 4L));
-		vertices.add(new Vertex<Long, Long>(5L, 5L));
 
 		return env.fromCollection(vertices);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
new file mode 100755
index 0000000..185d922
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.LabelPropagationExample;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
+
+	public LabelPropagationExampleITCase(ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of label propagation example with a simple graph
+		 */
+
+		final String vertices = "1 10\n" +
+				"2 10\n" +
+				"3 30\n" +
+				"4 40\n" +
+				"5 40\n" +
+				"6 40\n" +
+				"7 70\n";
+
+		final String edges = "1 3\n" +
+				"2 3\n" +
+				"4 7\n" +
+				"5 7\n" +
+				"6 7\n" +
+				"7 3\n";
+
+		String verticesPath = createTempFile(vertices);
+		String edgesPath = createTempFile(edges);
+
+		LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "1"});
+
+		expectedResult = "1,10\n" +
+			"2,10\n" +
+			"3,10\n" +
+			"4,40\n" +
+			"5,40\n" +
+			"6,40\n" +
+			"7,40\n";
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test the label propagation example where a tie must be broken
+		 */
+
+		final String vertices = "1 10\n" +
+				"2 10\n" +
+				"3 10\n" +
+				"4 10\n" +
+				"5 0\n" +
+				"6 20\n" +
+				"7 20\n" +
+				"8 20\n" +
+				"9 20\n";
+
+		final String edges = "1 5\n" +
+				"2 5\n" +
+				"3 5\n" +
+				"4 5\n" +
+				"6 5\n" +
+				"7 5\n" +
+				"8 5\n" +
+				"9 5\n";
+
+		String verticesPath = createTempFile(vertices);
+		String edgesPath = createTempFile(edges);
+
+		LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "1"});
+
+		expectedResult = "1,10\n" +
+				"2,10\n" +
+				"3,10\n" +
+				"4,10\n" +
+				"5,20\n" +
+				"6,20\n" +
+				"7,20\n" +
+				"8,20\n" +
+				"9,20\n";
+	}
+
+	// -------------------------------------------------------------------------
+	//  Util methods
+	// -------------------------------------------------------------------------
+
+	private String createTempFile(final String rows) throws Exception {
+		File tempFile = tempFolder.newFile();
+		Files.write(rows, tempFile, Charsets.UTF_8);
+		return tempFile.toURI().toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
new file mode 100644
index 0000000..8c363a5
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DegreesITCase extends MultipleProgramsTestBase {
+
+	public DegreesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testOutDegrees() throws Exception {
+		/*
+		* Test outDegrees()
+		*/
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        graph.outDegrees().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2\n" +
+                    "2,1\n" +
+                    "3,2\n" +
+                    "4,1\n" +
+                    "5,1\n";
+    }
+
+	@Test
+	public void testOutDegreesWithNoOutEdges() throws Exception {
+		/*
+		 * Test outDegrees() no outgoing edges
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
+
+        graph.outDegrees().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,3\n" +
+                "2,1\n" +
+                "3,1\n" +
+                "4,1\n" +
+                "5,0\n";
+    }
+
+	@Test
+	public void testInDegrees() throws Exception {
+		/*
+		 * Test inDegrees()
+		 */
+	    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+	    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+	            TestGraphUtils.getLongLongEdgeData(env), env);
+
+	    graph.inDegrees().writeAsCsv(resultPath);
+	    env.execute();
+	    expectedResult = "1,1\n" +
+		            "2,1\n" +
+		            "3,2\n" +
+		            "4,1\n" +
+		            "5,2\n";
+    }
+
+	@Test
+	public void testInDegreesWithNoInEdge() throws Exception {
+		/*
+		 * Test inDegrees() no ingoing edge
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
+
+        graph.inDegrees().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,0\n" +
+	                "2,1\n" +
+	                "3,1\n" +
+	                "4,1\n" +
+	                "5,3\n";
+    }
+
+	@Test
+	public void testGetDegrees() throws Exception {
+		/*
+		 * Test getDegrees()
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        graph.getDegrees().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,3\n" +
+	                "2,2\n" +
+	                "3,4\n" +
+	                "4,2\n" +
+	                "5,3\n";
+    }
+
+	@Test
+	public void testGetDegreesWithDisconnectedData() throws Exception {
+        /*
+		 * Test getDegrees() with disconnected data
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, NullValue, Long> graph =
+                Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
+
+        graph.outDegrees().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,2\n" +
+                "2,1\n" +
+                "3,0\n" +
+                "4,1\n" +
+                "5,0\n";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
new file mode 100644
index 0000000..975d21a
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class FromCollectionITCase extends MultipleProgramsTestBase {
+
+	public FromCollectionITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testFromCollectionVerticesEdges() throws Exception {
+		/*
+		 * Test fromCollection(vertices, edges):
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+                TestGraphUtils.getLongLongEdges(), env);
+
+        graph.getEdges().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,2,12\n" +
+	                "1,3,13\n" +
+	                "2,3,23\n" +
+	                "3,4,34\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testFromCollectionEdgesNoInitialValue() throws Exception {
+        /*
+         * Test fromCollection(edges) with no initial value for the vertices
+         */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+        		env);
+
+        graph.getVertices().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,(null)\n" +
+	                "2,(null)\n" +
+	                "3,(null)\n" +
+	                "4,(null)\n" +
+	                "5,(null)\n";
+    }
+
+	@Test
+	public void testFromCollectionEdgesWithInitialValue() throws Exception {
+        /*
+         * Test fromCollection(edges) with vertices initialised by a
+         * function that takes the id and doubles it
+         */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+                new InitVerticesMapper(), env);
+
+        graph.getVertices().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,2\n" +
+	                "2,4\n" +
+	                "3,6\n" +
+	                "4,8\n" +
+	                "5,10\n";
+    }
+
+	@SuppressWarnings("serial")
+	private static final class InitVerticesMapper implements MapFunction<Long, Long> {
+        public Long map(Long vertexId) {
+            return vertexId * 2;
+        }
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
new file mode 100644
index 0000000..6848dad
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphCreationITCase extends MultipleProgramsTestBase {
+
+	public GraphCreationITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testCreateWithoutVertexValues() throws Exception {
+	/*
+	 * Test create() with edge dataset and no vertex values
+     */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
+
+		graph.getVertices().writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,(null)\n" +
+					"2,(null)\n" +
+					"3,(null)\n" +
+					"4,(null)\n" +
+					"5,(null)\n";
+	}
+
+	@Test
+	public void testCreateWithMapper() throws Exception {
+	/*
+	 * Test create() with edge dataset and a mapper that assigns the id as value
+     */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
+				new AssignIdAsValueMapper(), env);
+
+		graph.getVertices().writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,1\n" +
+					"2,2\n" +
+					"3,3\n" +
+					"4,4\n" +
+					"5,5\n";
+	}
+
+	@Test
+	public void testCreateWithCustomVertexValue() throws Exception {
+		/*
+		 * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet(
+				TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env);
+
+		graph.getVertices().writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,(2.0,0)\n" +
+				"2,(4.0,1)\n" +
+				"3,(6.0,2)\n" +
+				"4,(8.0,3)\n" +
+				"5,(10.0,4)\n";
+	}
+
+	@Test
+	public void testValidate() throws Exception {
+		/*
+		 * Test validate():
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env);
+		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
+		DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+
+		result.writeAsText(resultPath);
+		env.execute();
+
+		expectedResult = "true\n";
+	}
+
+	@Test
+	public void testValidateWithInvalidIds() throws Exception {
+		/*
+		 * Test validate() - invalid vertex ids
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
+		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
+		DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+		result.writeAsText(resultPath);
+		env.execute();
+
+		expectedResult = "false\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> {
+		public Long map(Long vertexId) {
+			return vertexId;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignCustomVertexValueMapper implements
+		MapFunction<Long, DummyCustomParameterizedType<Double>> {
+
+		DummyCustomParameterizedType<Double> dummyValue =
+				new DummyCustomParameterizedType<Double>();
+
+		public DummyCustomParameterizedType<Double> map(Long vertexId) {
+			dummyValue.setIntField(vertexId.intValue()-1);
+			dummyValue.setTField(vertexId*2.0);
+			return dummyValue;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
new file mode 100644
index 0000000..010ae1d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
+
+	public GraphCreationWithMapperITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testWithDoubleValueMapper() throws Exception {
+		/*
+		 * Test create() with edge dataset and a mapper that assigns a double constant as value
+	     */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
+				new AssignDoubleValueMapper(), env);
+
+		graph.getVertices().writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,0.1\n" +
+				"2,0.1\n" +
+				"3,0.1\n" +
+				"4,0.1\n" +
+				"5,0.1\n";
+	}
+
+	@Test
+	public void testWithTuple2ValueMapper() throws Exception {
+		/*
+		 * Test create() with edge dataset and a mapper that assigns a Tuple2 as value
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
+				TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env);
+
+		graph.getVertices().writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,(2,42)\n" +
+				"2,(4,42)\n" +
+				"3,(6,42)\n" +
+				"4,(8,42)\n" +
+				"5,(10,42)\n";
+	}
+
+	@Test
+	public void testWithConstantValueMapper() throws Exception {
+	/*
+	 * Test create() with edge dataset with String key type
+	 * and a mapper that assigns a double constant as value
+	 */
+	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
+			new AssignDoubleConstantMapper(), env);
+
+	graph.getVertices().writeAsCsv(resultPath);
+	env.execute();
+	expectedResult = "1,0.1\n" +
+			"2,0.1\n" +
+			"3,0.1\n" +
+			"4,0.1\n" +
+			"5,0.1\n";
+	}
+
+	@Test
+	public void testWithDCustomValueMapper() throws Exception {
+		/*
+		 * Test create() with edge dataset and a mapper that assigns a custom vertex value
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
+				TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env);
+
+		graph.getVertices().writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,(F,0)\n" +
+				"2,(F,1)\n" +
+				"3,(F,2)\n" +
+				"4,(F,3)\n" +
+				"5,(F,4)\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 0.1d;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> {
+		public Tuple2<Long, Long> map(Long vertexId) {
+			return new Tuple2<Long, Long>(vertexId*2, 42l);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignDoubleConstantMapper implements MapFunction<String, Double> {
+		public Double map(String value) {
+			return 0.1d;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignCustomValueMapper implements MapFunction<Long, DummyCustomType> {
+		public DummyCustomType map(Long vertexId) {
+			return new DummyCustomType(vertexId.intValue()-1, false);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
new file mode 100644
index 0000000..502d529
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphMutationsITCase extends MultipleProgramsTestBase {
+
+	public GraphMutationsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testAddVertex() throws Exception {
+		/*
+		 * Test addVertex() -- simple case
+		 */	
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+		edges.add(new Edge<Long, Long>(6L, 1L, 61L));
+		graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
+		graph.getEdges().writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n" +
+				"6,1,61\n";	
+	}
+
+	@Test
+	public void testAddVertexExisting() throws Exception {
+		/*
+		 * Test addVertex() -- add an existing vertex
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+		edges.add(new Edge<Long, Long>(1L, 5L, 15L));
+		graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges);
+		graph.getEdges().writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+					"1,3,13\n" +
+					"1,5,15\n" +
+					"2,3,23\n" +
+					"3,4,34\n" +
+					"3,5,35\n" +
+					"4,5,45\n" +
+					"5,1,51\n";
+	}
+
+	@Test
+	public void testAddVertexNoEdges() throws Exception {
+		/*
+		 * Test addVertex() -- add vertex with empty edge set
+		 */	
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+		graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
+		graph.getVertices().writeAsCsv(resultPath);
+		env.execute();
+	
+		expectedResult = "1,1\n" +
+			"2,2\n" +
+			"3,3\n" +
+			"4,4\n" +
+			"5,5\n" +
+			"6,6\n";
+	}
+
+	@Test
+	public void testRemoveVertex() throws Exception {
+		/*
+		 * Test removeVertex() -- simple case
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
+		graph.getEdges().writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n";
+	}
+
+	@Test
+	public void testRemoveInvalidVertex() throws Exception {
+		/*
+		 * Test removeVertex() -- remove an invalid vertex
+		 */	
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
+		graph.getEdges().writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";
+	}
+	
+	@Test
+	public void testAddEdge() throws Exception {
+		/*
+		 * Test addEdge() -- simple case
+		 */
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
+				61L);
+		graph.getEdges().writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n" +
+				"6,1,61\n";	
+	}
+	
+	@Test
+	public void testAddExistingEdge() throws Exception {
+		/*
+		 * Test addEdge() -- add already existing edge
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
+				12L);
+		graph.getEdges().writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+				"1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";	
+	}
+	
+	@Test
+	public void testRemoveVEdge() throws Exception {
+		/*
+		 * Test removeEdge() -- simple case
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
+		graph.getEdges().writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n";
+	}
+	
+	@Test
+	public void testRemoveInvalidEdge() throws Exception {
+		/*
+		 * Test removeEdge() -- invalid edge
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
+		graph.getEdges().writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
new file mode 100644
index 0000000..6c4f1ef
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphOperationsITCase extends MultipleProgramsTestBase {
+
+	public GraphOperationsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testUndirected() throws Exception {
+		/*
+		 * Test getUndirected()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		graph.getUndirected().getEdges().writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,2,12\n" + "2,1,12\n" +
+					"1,3,13\n" + "3,1,13\n" +
+					"2,3,23\n" + "3,2,23\n" +
+					"3,4,34\n" + "4,3,34\n" +
+					"3,5,35\n" + "5,3,35\n" +
+					"4,5,45\n" + "5,4,45\n" +
+					"5,1,51\n" + "1,5,51\n";
+	}
+
+	@Test
+	public void testReverse() throws Exception {
+		/*
+		 * Test reverse()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		graph.reverse().getEdges().writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "2,1,12\n" +
+					"3,1,13\n" +
+					"3,2,23\n" +
+					"4,3,34\n" +
+					"5,3,35\n" +
+					"5,4,45\n" +
+					"1,5,51\n";
+	}
+
+	@SuppressWarnings("serial")
+	@Test
+	public void testSubGraph() throws Exception {
+		/*
+		 * Test subgraph:
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
+						   public boolean filter(Vertex<Long, Long> vertex) throws Exception {
+							   return (vertex.getValue() > 2);
+						   }
+					   },
+				new FilterFunction<Edge<Long, Long>>() {
+					public boolean filter(Edge<Long, Long> edge) throws Exception {
+						return (edge.getValue() > 34);
+					}
+				}).getEdges().writeAsCsv(resultPath);
+
+		env.execute();
+		expectedResult = "3,5,35\n" +
+					"4,5,45\n";
+	}
+
+	@SuppressWarnings("serial")
+	@Test
+	public void testFilterVertices() throws Exception {
+		/*
+		 * Test filterOnVertices:
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
+			public boolean filter(Vertex<Long, Long> vertex) throws Exception {
+				return (vertex.getValue() > 2);
+			}
+		}).getEdges().writeAsCsv(resultPath);
+
+		env.execute();
+		expectedResult =  "3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n";
+	}
+
+	@SuppressWarnings("serial")
+	@Test
+	public void testFilterEdges() throws Exception {
+		/*
+		 * Test filterOnEdges:
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
+			public boolean filter(Edge<Long, Long> edge) throws Exception {
+				return (edge.getValue() > 34);
+			}
+		}).getEdges().writeAsCsv(resultPath);
+
+		env.execute();
+		expectedResult = "3,5,35\n" +
+					"4,5,45\n" +
+					"5,1,51\n";
+	}
+
+	@Test
+	public void testNumberOfVertices() throws Exception {
+		/*
+		 * Test numberOfVertices()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph.numberOfVertices().writeAsText(resultPath);
+
+		env.execute();
+		expectedResult = "5";
+	}
+
+	@Test
+	public void testNumberOfEdges() throws Exception {
+		/*
+		 * Test numberOfEdges()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph.numberOfEdges().writeAsText(resultPath);
+
+		env.execute();
+		expectedResult = "7";
+	}
+
+	@Test
+	public void testVertexIds() throws Exception {
+		/*
+		 * Test getVertexIds()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph.getVertexIds().writeAsText(resultPath);
+
+		env.execute();
+		expectedResult = "1\n2\n3\n4\n5\n";
+	}
+
+	@Test
+	public void testEdgesIds() throws Exception {
+		/*
+		 * Test getEdgeIds()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph.getEdgeIds().writeAsCsv(resultPath);
+
+		env.execute();
+		expectedResult = "1,2\n" + "1,3\n" +
+				"2,3\n" + "3,4\n" +
+				"3,5\n" + "4,5\n" +
+				"5,1\n";
+	}
+
+	@Test
+	public void testUnion() throws Exception {
+		/*
+		 * Test union()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+
+		vertices.add(new Vertex<Long, Long>(6L, 6L));
+		edges.add(new Edge<Long, Long>(6L, 1L, 61L));
+
+		graph = graph.union(Graph.fromCollection(vertices, edges, env));
+
+		graph.getEdges().writeAsCsv(resultPath);
+
+		env.execute();
+
+		expectedResult = "1,2,12\n" +
+					"1,3,13\n" +
+					"2,3,23\n" +
+					"3,4,34\n" +
+					"3,5,35\n" +
+					"4,5,45\n" +
+					"5,1,51\n" +
+					"6,1,61\n";
+	}
+}
\ No newline at end of file


Mime
View raw message