flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [42/50] [abbrv] flink git commit: [FLINK-1201] [gelly] use MultipleProgramsTestBase for the tests
Date Wed, 11 Feb 2015 10:49:44 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
index 31bd48b..c4d44b0 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
@@ -18,219 +18,216 @@
 
 package org.apache.flink.graph.test;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class TestMapVertices extends JavaProgramTestBase {
+public class TestMapVertices extends MultipleProgramsTestBase {
 
-	private static int NUM_PROGRAMS = 5;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-	
-	public TestMapVertices(Configuration config) {
-		super(config);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+	public TestMapVertices(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
+
+	@After
+	public void after() throws Exception{
 		compareResultsByLinesInMemory(expectedResult, resultPath);
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testWithSameValue() throws Exception {
+		/*
+		 * Test mapVertices() keeping the same value type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,2\n" +
+			"2,3\n" +
+			"3,4\n" +
+			"4,5\n" +
+			"5,6\n";
+	}
+
+	@Test
+	public void testWithStringValue() throws Exception {
+		/*
+		 * Test mapVertices() and change the value type to String
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,one\n" +
+			"2,two\n" +
+			"3,three\n" +
+			"4,four\n" +
+			"5,five\n";
+	}
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
+	@Test
+	public void testWithtuple1Value() throws Exception {
+		/*
+		 * Test mapVertices() and change the value type to a Tuple1
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,(1)\n" +
+			"2,(2)\n" +
+			"3,(3)\n" +
+			"4,(4)\n" +
+			"5,(5)\n";
+	}
+
+	@Test
+	public void testWithCustomType() throws Exception {
+		/*
+		 * Test mapVertices() and change the value type to a custom type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		
-		return toParameterList(tConfigs);
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,(T,1)\n" +
+			"2,(T,2)\n" +
+			"3,(T,3)\n" +
+			"4,(T,4)\n" +
+			"5,(T,5)\n";
 	}
+
+	@Test
+	public void testWithCustomParametrizedType() throws Exception {
+		/*
+		 * Test mapVertices() and change the value type to a parameterized custom type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
+				new ToCustomParametrizedTypeMapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
 	
-	private static class GraphProgs {
-		
-		@SuppressWarnings("serial")
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Test mapVertices() keeping the same value type
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env), env);
-				
-				DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, Long>() {
-					public Long map(Vertex<Long, Long> value) throws Exception {
-						return value.getValue()+1;
-					}
-				}).getVertices();
-				
-				mappedVertices.writeAsCsv(resultPath);
-				env.execute();
-				return "1,2\n" +
-				"2,3\n" +
-				"3,4\n" +
-				"4,5\n" +
-				"5,6\n";
+		expectedResult = "1,(1.0,1)\n" +
+			"2,(2.0,2)\n" +
+			"3,(3.0,3)\n" +
+			"4,(4.0,4)\n" +
+			"5,(5.0,5)\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+		public Long map(Vertex<Long, Long> value) throws Exception {
+			return value.getValue()+1;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> {
+		public String map(Vertex<Long, Long> vertex) throws Exception {
+			String stringValue;
+			if (vertex.getValue() == 1) {
+				stringValue = "one";
 			}
-			case 2: {
-				/*
-				 * Test mapVertices() and change the value type to String
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env), env);
-				
-				DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, String>() {
-					public String map(Vertex<Long, Long> vertex) throws Exception {
-						String stringValue;
-						if (vertex.getValue() == 1) {
-							stringValue = "one";
-						}
-						else if (vertex.getValue() == 2) {
-							stringValue = "two";
-						}
-						else if (vertex.getValue() == 3) {
-							stringValue = "three";
-						}
-						else if (vertex.getValue() == 4) {
-							stringValue = "four";
-						}
-						else if (vertex.getValue() == 5) {
-							stringValue = "five";
-						}
-						else {
-							stringValue = "";
-						}
-						return stringValue;
-					}
-				}).getVertices();
-				
-				mappedVertices.writeAsCsv(resultPath);
-				env.execute();
-				return "1,one\n" +
-				"2,two\n" +
-				"3,three\n" +
-				"4,four\n" +
-				"5,five\n";
+			else if (vertex.getValue() == 2) {
+				stringValue = "two";
 			}
-			case 3: {
-				/*
-				 * Test mapVertices() and change the value type to a Tuple1
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env), env);
-				
-				DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, Tuple1<Long>>() {
-					public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
-						Tuple1<Long> tupleValue = new Tuple1<Long>();
-						tupleValue.setFields(vertex.getValue());
-						return tupleValue;
-					}
-				}).getVertices();
-				
-				mappedVertices.writeAsCsv(resultPath);
-				env.execute();
-				return "1,(1)\n" +
-				"2,(2)\n" +
-				"3,(3)\n" +
-				"4,(4)\n" +
-				"5,(5)\n";
+			else if (vertex.getValue() == 3) {
+				stringValue = "three";
 			}
-			case 4: {
-				/*
-				 * Test mapVertices() and change the value type to a custom type
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env), env);
-				
-				DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, DummyCustomType>() {
-					public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
-						DummyCustomType dummyValue = new DummyCustomType();
-						dummyValue.setIntField(vertex.getValue().intValue());						
-						return dummyValue;
-					}
-				}).getVertices();
-				
-				mappedVertices.writeAsCsv(resultPath);
-				env.execute();
-				return "1,(T,1)\n" +
-				"2,(T,2)\n" +
-				"3,(T,3)\n" +
-				"4,(T,4)\n" +
-				"5,(T,5)\n";
+			else if (vertex.getValue() == 4) {
+				stringValue = "four";
 			}
-			case 5: {
-				/*
-				 * Test mapVertices() and change the value type to a parameterized custom type
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env), env);
-				
-				DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
-						new MapFunction<Vertex<Long, Long>, DummyCustomParameterizedType<Double>>() {
-					public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
-						DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
-						dummyValue.setIntField(vertex.getValue().intValue());
-						dummyValue.setTField(new Double(vertex.getValue()));						
-						return dummyValue;
-					}
-				}).getVertices();
-				
-				mappedVertices.writeAsCsv(resultPath);
-				env.execute();
-				return "1,(1.0,1)\n" +
-				"2,(2.0,2)\n" +
-				"3,(3.0,3)\n" +
-				"4,(4.0,4)\n" +
-				"5,(5.0,5)\n";
+			else if (vertex.getValue() == 5) {
+				stringValue = "five";
 			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
+			else {
+				stringValue = "";
 			}
+			return stringValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
+		public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
+			Tuple1<Long> tupleValue = new Tuple1<Long>();
+			tupleValue.setFields(vertex.getValue());
+			return tupleValue;
 		}
 	}
 
+	@SuppressWarnings("serial")
+	private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> {
+		public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
+			DummyCustomType dummyValue = new DummyCustomType();
+			dummyValue.setIntField(vertex.getValue().intValue());						
+			return dummyValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>, 
+		DummyCustomParameterizedType<Double>> {
+		
+		public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
+			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
+			dummyValue.setIntField(vertex.getValue().intValue());
+			dummyValue.setTField(new Double(vertex.getValue()));						
+			return dummyValue;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
index 89ef7c1..7a02ffe 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java
@@ -18,297 +18,300 @@
 
 package org.apache.flink.graph.test;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.EdgesFunction;
 import org.apache.flink.graph.EdgesFunctionWithVertexValue;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class TestReduceOnEdgesMethods extends JavaProgramTestBase {
+public class TestReduceOnEdgesMethods extends MultipleProgramsTestBase {
 
-	private static int NUM_PROGRAMS = 6;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-	
-	public TestReduceOnEdgesMethods(Configuration config) {
-		super(config);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+	public TestReduceOnEdgesMethods(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
+
+	@After
+	public void after() throws Exception{
 		compareResultsByLinesInMemory(expectedResult, resultPath);
 	}
+
+	@Test
+	public void testLowestWeightOutNeighbor() throws Exception {
+		/*
+		 * Get the lowest-weight out-neighbor
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+				graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
+		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+		env.execute();
 	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+		expectedResult = "1,2\n" +
+				"2,3\n" + 
+				"3,4\n" +
+				"4,5\n" + 
+				"5,1\n";
+	}
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testLowestWeightInNeighbor() throws Exception {
+		/*
+		 * Get the lowest-weight in-neighbor
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
+		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+				graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
+		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,5\n" +
+					"2,1\n" + 
+					"3,1\n" +
+					"4,3\n" + 
+					"5,3\n";
 	}
-	
-	private static class GraphProgs {
-	
-		@SuppressWarnings("serial")
-		public static String runProgram(int progId, String resultPath) throws Exception {
+
+	@Test
+	public void testMaxWeightEdge() throws Exception {
+		/*
+		 * Get the maximum weight among all edges
+		 * of a vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
+				graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
+		verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,51\n" +
+				"2,23\n" + 
+				"3,35\n" +
+				"4,45\n" + 
+				"5,51\n";
+	}
+
+	@Test
+	public void testLowestWeightOutNeighborNoValue() throws Exception {
+		/*
+		 * Get the lowest-weight out-neighbor
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+				graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
+		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2\n" +
+				"2,3\n" + 
+				"3,4\n" +
+				"4,5\n" + 
+				"5,1\n";
+	}
+
+	@Test
+	public void testLowestWeightInNeighborNoValue() throws Exception {
+		/*
+		 * Get the lowest-weight in-neighbor
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+				graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
+		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,5\n" +
+				"2,1\n" + 
+				"3,1\n" +
+				"4,3\n" + 
+				"5,3\n";
+	}
+
+	@Test
+	public void testMaxWeightAllNeighbors() throws Exception {
+		/*
+		 * Get the maximum weight among all edges
+		 * of a vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
+				graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
+		verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,51\n" +
+				"2,23\n" + 
+				"3,35\n" +
+				"4,45\n" + 
+				"5,51\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(
+				Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges) {
 			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Get the lowest-weight out-neighbor
-				 * for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-						graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() {
-
-					public Tuple2<Long, Long> iterateEdges(
-							Vertex<Long, Long> v,
-							Iterable<Edge<Long, Long>> edges) {
-						
-						long weight = Long.MAX_VALUE;
-						long minNeighorId = 0;
-						
-						for (Edge<Long, Long> edge: edges) {
-							if (edge.getValue() < weight) {
-								weight = edge.getValue();
-								minNeighorId = edge.getTarget();
-							}
-						}
-						return new Tuple2<Long, Long>(v.getId(), minNeighorId);
-					}
-				}, EdgeDirection.OUT);
-				verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-				env.execute();
-				return "1,2\n" +
-						"2,3\n" + 
-						"3,4\n" +
-						"4,5\n" + 
-						"5,1\n";
-			}
-			case 2: {
-				/*
-				 * Get the lowest-weight in-neighbor
-				 * for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-						graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() {
-
-					public Tuple2<Long, Long> iterateEdges(
-							Vertex<Long, Long> v,
-							Iterable<Edge<Long, Long>> edges) {
-						
-						long weight = Long.MAX_VALUE;
-						long minNeighorId = 0;
-						
-						for (Edge<Long, Long> edge: edges) {
-							if (edge.getValue() < weight) {
-								weight = edge.getValue();
-								minNeighorId = edge.getSource();
-							}
-						}
-						return new Tuple2<Long, Long>(v.getId(), minNeighorId);
-					}
-				}, EdgeDirection.IN);
-				verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-				env.execute();
-				return "1,5\n" +
-						"2,1\n" + 
-						"3,1\n" +
-						"4,3\n" + 
-						"5,3\n";
+			long weight = Long.MAX_VALUE;
+			long minNeighorId = 0;
+			
+			for (Edge<Long, Long> edge: edges) {
+				if (edge.getValue() < weight) {
+					weight = edge.getValue();
+					minNeighorId = edge.getTarget();
+				}
 			}
-			case 3: {
-				/*
-				 * Get the maximum weight among all edges
-				 * of a vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-						graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() {
-
-					public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
-							Iterable<Edge<Long, Long>> edges) {
-						
-						long weight = Long.MIN_VALUE;
-
-						for (Edge<Long, Long> edge: edges) {
-							if (edge.getValue() > weight) {
-								weight = edge.getValue();
-							}
-						}
-						return new Tuple2<Long, Long>(v.getId(), weight);
-					}
-				}, EdgeDirection.ALL);
-				verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
-				env.execute();
-				return "1,51\n" +
-						"2,23\n" + 
-						"3,35\n" +
-						"4,45\n" + 
-						"5,51\n";
+			return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges) {
+			
+			long weight = Long.MIN_VALUE;
+
+			for (Edge<Long, Long> edge: edges) {
+				if (edge.getValue() > weight) {
+					weight = edge.getValue();
+				}
 			}
-			case 4: {
-				/*
-				 * Get the lowest-weight out-neighbor
-				 * for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-						graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() {
-
-					public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-
-						long weight = Long.MAX_VALUE;
-						long minNeighorId = 0;
-						long vertexId = -1;
-						long i=0;
-
-						for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-							if (edge.f1.getValue() < weight) {
-								weight = edge.f1.getValue();
-								minNeighorId = edge.f1.getTarget();
-							}
-							if (i==0) {
-								vertexId = edge.f0;
-							} i++;
-						}
-						return new Tuple2<Long, Long>(vertexId, minNeighorId);
-					}
-				}, EdgeDirection.OUT);
-				verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-				env.execute();
-				return "1,2\n" +
-						"2,3\n" + 
-						"3,4\n" +
-						"4,5\n" + 
-						"5,1\n";
+			return new Tuple2<Long, Long>(v.getId(), weight);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+
+			long weight = Long.MAX_VALUE;
+			long minNeighorId = 0;
+			long vertexId = -1;
+			long i=0;
+
+			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+				if (edge.f1.getValue() < weight) {
+					weight = edge.f1.getValue();
+					minNeighorId = edge.f1.getTarget();
+				}
+				if (i==0) {
+					vertexId = edge.f0;
+				} i++;
 			}
-			case 5: {
-				/*
-				 * Get the lowest-weight in-neighbor
-				 * for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-						graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() {
-
-					public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-						
-						long weight = Long.MAX_VALUE;
-						long minNeighorId = 0;
-						long vertexId = -1;
-						long i=0;
-
-						for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-							if (edge.f1.getValue() < weight) {
-								weight = edge.f1.getValue();
-								minNeighorId = edge.f1.getSource();
-							}
-							if (i==0) {
-								vertexId = edge.f0;
-							} i++;
-						}
-						return new Tuple2<Long, Long>(vertexId, minNeighorId);
-					}
-				}, EdgeDirection.IN);
-				verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
-				env.execute();
-				return "1,5\n" +
-						"2,1\n" + 
-						"3,1\n" +
-						"4,3\n" + 
-						"5,3\n";
+			return new Tuple2<Long, Long>(vertexId, minNeighorId);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+			
+			long weight = Long.MIN_VALUE;
+			long vertexId = -1;
+			long i=0;
+
+			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+				if (edge.f1.getValue() > weight) {
+					weight = edge.f1.getValue();
+				}
+				if (i==0) {
+					vertexId = edge.f0;
+				} i++;
 			}
-			case 6: {
-				/*
-				 * Get the maximum weight among all edges
-				 * of a vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-						graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() {
-
-					public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-						
-						long weight = Long.MIN_VALUE;
-						long vertexId = -1;
-						long i=0;
-
-						for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-							if (edge.f1.getValue() > weight) {
-								weight = edge.f1.getValue();
-							}
-							if (i==0) {
-								vertexId = edge.f0;
-							} i++;
-						}
-						return new Tuple2<Long, Long>(vertexId, weight);
-					}
-				}, EdgeDirection.ALL);
-				verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
-				env.execute();
-				return "1,51\n" +
-						"2,23\n" + 
-						"3,35\n" +
-						"4,45\n" + 
-						"5,51\n";
+			return new Tuple2<Long, Long>(vertexId, weight);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(
+				Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges) {
+			
+			long weight = Long.MAX_VALUE;
+			long minNeighorId = 0;
+			
+			for (Edge<Long, Long> edge: edges) {
+				if (edge.getValue() < weight) {
+					weight = edge.getValue();
+					minNeighorId = edge.getSource();
+				}
 			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
+			return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+			
+			long weight = Long.MAX_VALUE;
+			long minNeighorId = 0;
+			long vertexId = -1;
+			long i=0;
+
+			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+				if (edge.f1.getValue() < weight) {
+					weight = edge.f1.getValue();
+					minNeighorId = edge.f1.getSource();
+				}
+				if (i==0) {
+					vertexId = edge.f0;
+				} i++;
 			}
+			return new Tuple2<Long, Long>(vertexId, minNeighorId);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
index 2624960..e64eacf 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java
@@ -18,273 +18,286 @@
 
 package org.apache.flink.graph.test;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.NeighborsFunction;
 import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class TestReduceOnNeighborMethods extends JavaProgramTestBase {
+public class TestReduceOnNeighborMethods extends MultipleProgramsTestBase {
 
-	private static int NUM_PROGRAMS = 6;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-	
-	public TestReduceOnNeighborMethods(Configuration config) {
-		super(config);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+	public TestReduceOnNeighborMethods(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
+
+	@After
+	public void after() throws Exception{
 		compareResultsByLinesInMemory(expectedResult, resultPath);
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testSumOfOutNeighbors() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,5\n" +
+				"2,3\n" + 
+				"3,9\n" +
+				"4,5\n" + 
+				"5,1\n";
+	}
+
+	@Test
+	public void testSumOfInNeighbors() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum = 
+				graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);		
+
+		verticesWithSum.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,255\n" +
+				"2,12\n" + 
+				"3,59\n" +
+				"4,102\n" + 
+				"5,285\n";
 	}
+
+	@Test
+	public void testSumOfOAllNeighbors() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * including own vertex value
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,11\n" +
+				"2,6\n" + 
+				"3,15\n" +
+				"4,12\n" + 
+				"5,13\n";
+	}
+
+	@Test
+	public void testSumOfOutNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,5\n" +
+				"2,3\n" + 
+				"3,9\n" +
+				"4,5\n" + 
+				"5,1\n";
+	}
+
+	@Test
+	public void testSumOfInNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum = 
+				graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
+
+		verticesWithSum.writeAsCsv(resultPath);
+		env.execute();
 	
-	private static class GraphProgs {
+		expectedResult = "1,255\n" +
+				"2,12\n" + 
+				"3,59\n" +
+				"4,102\n" + 
+				"5,285\n";
+	}
+
+	@Test
+	public void testSumOfAllNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
 	
-		@SuppressWarnings("serial")
-		public static String runProgram(int progId, String resultPath) throws Exception {
+		expectedResult = "1,10\n" +
+				"2,4\n" + 
+				"3,12\n" +
+				"4,8\n" + 
+				"5,8\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+	Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
 			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Get the sum of out-neighbor values
-				 * for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-						graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, 
-								Tuple2<Long, Long>>() {
-							public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-									Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-								long sum = 0;
-								for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-									sum += neighbor.f1.getValue();
-								}
-								return new Tuple2<Long, Long>(vertex.getId(), sum);
-							}
-						}, EdgeDirection.OUT);
-
-				verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-				env.execute();
-				return "1,5\n" +
-						"2,3\n" + 
-						"3,9\n" +
-						"4,5\n" + 
-						"5,1\n";
-			}
-			case 2: {
-				/*
-				 * Get the sum of in-neighbor values
-				 * times the edge weights for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithSum = 
-						graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, 
-								Tuple2<Long, Long>>() {
-							public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-									Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-								long sum = 0;
-								for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-									sum += neighbor.f0.getValue() * neighbor.f1.getValue();
-								}
-								return new Tuple2<Long, Long>(vertex.getId(), sum);
-							}
-						}, EdgeDirection.IN);		
-
-				verticesWithSum.writeAsCsv(resultPath);
-				env.execute();
-				return "1,255\n" +
-						"2,12\n" + 
-						"3,59\n" +
-						"4,102\n" + 
-						"5,285\n";
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
 			}
-			case 3: {
-				/*
-				 * Get the sum of all neighbor values
-				 * including own vertex value
-				 * for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-						graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, 
-								Tuple2<Long, Long>>() {
-							public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-									Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-								long sum = 0;
-								for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
-									sum += neighbor.f1.getValue();
-								}
-								return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue());
-							}
-						}, EdgeDirection.ALL);
-
-				verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-				env.execute();
-				return "1,11\n" +
-						"2,6\n" + 
-						"3,15\n" +
-						"4,12\n" + 
-						"5,13\n";
+			return new Tuple2<Long, Long>(vertex.getId(), sum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+		
+		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+		
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
 			}
-			case 4: {
-				/*
-				 * Get the sum of out-neighbor values
-				 * for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-						graph.reduceOnNeighbors(new NeighborsFunction<Long, Long, Long, 
-								Tuple2<Long, Long>>() {
-							public Tuple2<Long, Long> iterateNeighbors(
-									Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-								long sum = 0;
-								Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-								Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-										neighbors.iterator();
-								while(neighborsIterator.hasNext()) {
-									next = neighborsIterator.next();
-									sum += next.f2.getValue();
-								}
-								return new Tuple2<Long, Long>(next.f0, sum);
-							}
-						}, EdgeDirection.OUT);
-
-				verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-				env.execute();
-				return "1,5\n" +
-						"2,3\n" + 
-						"3,9\n" +
-						"4,5\n" + 
-						"5,1\n";
+			return new Tuple2<Long, Long>(vertex.getId(), sum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+		Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+	
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
 			}
-			case 5: {
-				/*
-				 * Get the sum of in-neighbor values
-				 * times the edge weights for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithSum = 
-						graph.reduceOnNeighbors(new NeighborsFunction<Long, Long, Long, 
-								Tuple2<Long, Long>>() {
-							public Tuple2<Long, Long> iterateNeighbors(
-									Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-								long sum = 0;
-								Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-								Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-										neighbors.iterator();
-								while(neighborsIterator.hasNext()) {
-									next = neighborsIterator.next();
-									sum += next.f2.getValue() * next.f1.getValue();
-								}
-								return new Tuple2<Long, Long>(next.f0, sum);
-							}
-						}, EdgeDirection.IN);
-
-
-				verticesWithSum.writeAsCsv(resultPath);
-				env.execute();
-				return "1,255\n" +
-						"2,12\n" + 
-						"3,59\n" +
-						"4,102\n" + 
-						"5,285\n";
+			return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateNeighbors(
+				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue();
 			}
-			case 6: {
-				/*
-				 * Get the sum of all neighbor values
-				 * for each vertex
-		         */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-						TestGraphUtils.getLongLongEdgeData(env), env);
-
-				DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-						graph.reduceOnNeighbors(new NeighborsFunction<Long, Long, Long, 
-								Tuple2<Long, Long>>() {
-							public Tuple2<Long, Long> iterateNeighbors(
-									Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-								long sum = 0;
-								Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-								Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-										neighbors.iterator();
-								while(neighborsIterator.hasNext()) {
-									next = neighborsIterator.next();
-									sum += next.f2.getValue();
-								}
-								return new Tuple2<Long, Long>(next.f0, sum);
-							}
-						}, EdgeDirection.ALL);
-
-				verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
-				env.execute();
-				return "1,10\n" +
-						"2,4\n" + 
-						"3,12\n" +
-						"4,8\n" + 
-						"5,8\n";
+			return new Tuple2<Long, Long>(next.f0, sum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+		
+		public Tuple2<Long, Long> iterateNeighbors(
+				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+		
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue() * next.f1.getValue();
 			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
+			return new Tuple2<Long, Long>(next.f0, sum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateNeighbors(
+				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+	
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue();
 			}
+			return new Tuple2<Long, Long>(next.f0, sum);
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java
index 7f1049c..f5b6d9d 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java
@@ -18,126 +18,101 @@
 
 package org.apache.flink.graph.test;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-public class TestWeaklyConnected extends JavaProgramTestBase {
+public class TestWeaklyConnected extends MultipleProgramsTestBase {
 
-	private static int NUM_PROGRAMS = 4;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-	
-	public TestWeaklyConnected(Configuration config) {
-		super(config);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
+	public TestWeaklyConnected(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
 	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
+
+	@After
+	public void after() throws Exception{
 		compareResultsByLinesInMemory(expectedResult, resultPath);
 	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
 
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+	@Test
+	public void testWithConnectedDirected() throws Exception {
+		/*
+		 * Test isWeaklyConnected() with a connected, directed graph
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		
+		env.execute();
+		expectedResult = "true\n";
+	}
 
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
+	@Test
+	public void testWithDisconnectedDirected() throws Exception {
+		/*
+		 * Test isWeaklyConnected() with a disconnected, directed graph
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
 		
-		return toParameterList(tConfigs);
+		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		
+		env.execute();
+		expectedResult = "false\n";
 	}
-	
-	private static class GraphProgs {
+
+	@Test
+	public void testWithConnectedUndirected() throws Exception {
+		/*
+		 * Test isWeaklyConnected() with a connected, undirected graph
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env).getUndirected();
+		
+		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		
+		env.execute();
+		expectedResult = "true\n";
+	}
+
+	@Test
+	public void testWithDisconnectedUndirected() throws Exception {
+		/*
+		 * Test isWeaklyConnected() with a disconnected, undirected graph
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
+		
+		graph.isWeaklyConnected(10).writeAsText(resultPath);
 		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Test isWeaklyConnected() with a connected, directed graph
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env), env);
-				
-				graph.isWeaklyConnected(10).writeAsText(resultPath);
-				
-				env.execute();
-				return "true\n";
-			}
-			case 2: {
-				/*
-				 * Test isWeaklyConnected() with a disconnected, directed graph
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
-				
-				graph.isWeaklyConnected(10).writeAsText(resultPath);
-				
-				env.execute();
-				return "false\n";
-			}
-			case 3: {
-				/*
-				 * Test isWeaklyConnected() with a connected, undirected graph
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getLongLongEdgeData(env), env).getUndirected();
-				
-				graph.isWeaklyConnected(10).writeAsText(resultPath);
-				
-				env.execute();
-				return "true\n";
-			}
-			case 4: {
-				/*
-				 * Test isWeaklyConnected() with a disconnected, undirected graph
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-						TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
-				
-				graph.isWeaklyConnected(10).writeAsText(resultPath);
-				
-				env.execute();
-				return "false\n";
-			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
-			}
-		}
+		env.execute();
+		expectedResult = "false\n";
 	}
-	
 }


Mime
View raw message