flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/4] flink git commit: [FLINK-3618] [gelly] Rename abstract UDF classes in Scatter-Gather implementation
Date Thu, 30 Jun 2016 18:57:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6c6b17b4d -> 7324b9c17


http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
index 7755708..2c7e093 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -16,28 +16,27 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.graph.spargel;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.DeltaIterationResultSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.TwoInputUdfOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class SpargelTranslationTest {
@@ -46,28 +45,28 @@ public class SpargelTranslationTest {
 	public void testTranslationPlainEdges() {
 		try {
 			final String ITERATION_NAME = "Test Name";
-			
+
 			final String AGGREGATOR_NAME = "AggregatorName";
-			
+
 			final String BC_SET_MESSAGES_NAME = "borat messages";
-			
+
 			final String BC_SET_UPDATES_NAME = "borat updates";
 
 			final int NUM_ITERATIONS = 13;
-			
+
 			final int ITERATION_parallelism = 77;
-			
-			
+
+
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Long> bcMessaging = env.fromElements(1L);
 			DataSet<Long> bcUpdate = env.fromElements(1L);
-			
+
 			DataSet<Vertex<String, Double>> result;
-			
+
 			// ------------ construct the test program ------------------
 			{
-				
+
 				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc",
3.44));
 
 				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a",
"c"));
@@ -83,41 +82,41 @@ public class SpargelTranslationTest {
 
 				ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
-				parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
-				parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
+				parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+				parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcUpdate);
 				parameters.setName(ITERATION_NAME);
 				parameters.setParallelism(ITERATION_parallelism);
 				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
 
-				result = graph.runScatterGatherIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
+				result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
 						NUM_ITERATIONS, parameters).getVertices();
 
 				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
 			}
-			
-			
+
+
 			// ------------- validate the java program ----------------
-			
+
 			assertTrue(result instanceof DeltaIterationResultSet);
-			
+
 			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>)
result;
 			DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
-			
+
 			// check the basic iteration properties
 			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
 			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
 			assertEquals(ITERATION_parallelism, iteration.getParallelism());
 			assertEquals(ITERATION_NAME, iteration.getName());
-			
+
 			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-			
+
 			// validate that the semantic properties are set as they should
 			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?,
?, ?>) resultSet.getNextWorkset();
 			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
 			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
-			
+
 			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>)
solutionSetJoin.getInput1();
-			
+
 			// validate that the broadcast sets are forwarded
 			assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
 			assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
@@ -128,29 +127,29 @@ public class SpargelTranslationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
 		try {
 			final String ITERATION_NAME = "Test Name";
-			
+
 			final String AGGREGATOR_NAME = "AggregatorName";
-			
+
 			final String BC_SET_MESSAGES_NAME = "borat messages";
-			
+
 			final String BC_SET_UPDATES_NAME = "borat updates";
 
 			final int NUM_ITERATIONS = 13;
-			
+
 			final int ITERATION_parallelism = 77;
-			
-			
+
+
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Long> bcVar = env.fromElements(1L);
-			
+
 			DataSet<Vertex<String, Double>> result;
-			
+
 			// ------------ construct the test program ------------------
 			{
 
@@ -169,41 +168,41 @@ public class SpargelTranslationTest {
 
 				ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
-				parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
-				parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
+				parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcVar);
+				parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcVar);
 				parameters.setName(ITERATION_NAME);
 				parameters.setParallelism(ITERATION_parallelism);
 				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-				
-				result = graph.runScatterGatherIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
+
+				result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
 						NUM_ITERATIONS, parameters).getVertices();
 
 				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
 			}
-			
-			
+
+
 			// ------------- validate the java program ----------------
-			
+
 			assertTrue(result instanceof DeltaIterationResultSet);
-			
+
 			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>)
result;
 			DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
-			
+
 			// check the basic iteration properties
 			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
 			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
 			assertEquals(ITERATION_parallelism, iteration.getParallelism());
 			assertEquals(ITERATION_NAME, iteration.getName());
-			
+
 			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-			
+
 			// validate that the semantic properties are set as they should
 			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?,
?, ?>) resultSet.getNextWorkset();
 			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
 			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
-			
+
 			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>)
solutionSetJoin.getInput1();
-			
+
 			// validate that the broadcast sets are forwarded
 			assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
 			assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
@@ -214,18 +213,18 @@ public class SpargelTranslationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long>
{
+
+	private static class MessageFunctionNoEdgeValue extends ScatterFunction<String, Double,
Long, NullValue> {
 
 		@Override
-		public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long>
inMessages) {}
+		public void sendMessages(Vertex<String, Double> vertex) {}
 	}
-	
-	public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double,
Long, NullValue> {
+
+	private static class UpdateFunction extends GatherFunction<String, Double, Long> {
 
 		@Override
-		public void sendMessages(Vertex<String, Double> vertex) {}
+		public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long>
inMessages) {}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index b1c2a2c..cb7573c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,47 +36,46 @@ public class CollectionModeSuperstepITCase {
 
 	/**
 	 * Dummy iteration to test that the supersteps are correctly incremented
-	 * and can be retrieved from inside the updated and messaging functions.
+	 * and can be retrieved from inside the scatter and gather functions.
 	 * All vertices start with value 1 and increase their value by 1
 	 * in each iteration. 
 	 */
 	@Test
 	public void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),

+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
 				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
-		
+
 		Graph<Long, Long, Long> result = graph.runScatterGatherIteration(
-				new UpdateFunction(), new MessageFunction(), 10);
+				new MessageFunction(), new UpdateFunction(), 10);
 
 		result.getVertices().map(
 				new VertexToTuple2Map<Long, Long>()).output(
 						new DiscardingOutputFormat<Tuple2<Long, Long>>());
 		env.execute();
 	}
-	
-	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long>
{
+
+	private static final class MessageFunction extends ScatterFunction<Long, Long, Long,
Long> {
 		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
+		public void sendMessages(Vertex<Long, Long> vertex) {
 			long superstep = getSuperstepNumber();
 			Assert.assertEquals(true, vertex.getValue() == superstep);
-			setNewVertexValue(vertex.getValue() + 1);
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
-	
-	public static final class MessageFunction extends MessagingFunction<Long, Long, Long,
Long> {
+
+	private static final class UpdateFunction extends GatherFunction<Long, Long, Long>
{
 		@Override
-		public void sendMessages(Vertex<Long, Long> vertex) {
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
 			long superstep = getSuperstepNumber();
 			Assert.assertEquals(true, vertex.getValue() == superstep);
-			//send message to keep vertices active
-			sendMessageToAllNeighbors(vertex.getValue());
+			setNewVertexValue(vertex.getValue() + 1);
 		}
 	}
 
-	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>,
Long> {
-
+	private static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>,
Long> {
 		public Long map(Vertex<Long, Long> value) {
 			return 1L;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index f14e002..fcd0d82 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.graph.test;
 
-import java.util.HashSet;
-import java.util.List;
-
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -31,18 +28,21 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.ScatterGatherIteration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
+
+import java.util.HashSet;
+import java.util.List;
 
 @RunWith(Parameterized.class)
 public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
@@ -59,30 +59,30 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		 * Test Graph's runScatterGatherIteration when configuration parameters are provided
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),

+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
 				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
 
 		// create the configuration object
 		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
-		parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
-		parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4,
5, 6));
+		parameters.addBroadcastSetForScatterFunction("messagingBcastSet", env.fromElements(4, 5,
6));
+		parameters.addBroadcastSetForGatherFunction("updateBcastSet", env.fromElements(1, 2, 3));
 		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
 		parameters.setOptNumVertices(true);
 
 		Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
-				new UpdateFunction(), new MessageFunction(), 10, parameters);
+				new MessageFunction(), new UpdateFunction(), 10, parameters);
 
 		DataSet<Vertex<Long,Long>> data = res.getVertices();
 		List<Vertex<Long,Long>> result= data.collect();
-        
+
 		expectedResult = "1,11\n" +
 						"2,11\n" +
 						"3,11\n" +
 						"4,11\n" +
 						"5,11";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -95,29 +95,29 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		ScatterGatherIteration<Long, Long, Long, Long> iteration = ScatterGatherIteration
-				.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(), 
-						new DummyMessageFunction(), 10);
-		
+				.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyMessageFunction(),
+						new DummyUpdateFunction(), 10);
+
 		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 		parameters.setName("gelly iteration");
 		parameters.setParallelism(2);
 		parameters.setSolutionSetUnmanagedMemory(true);
-		
+
 		iteration.configure(parameters);
-		
+
 		Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
 		Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
 		Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
 
 		DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
         List<Vertex<Long,Long>> result= data.collect();
-        
+
 		expectedResult = "1,11\n" +
 						"2,12\n" +
 						"3,13\n" +
 						"4,14\n" +
 						"5,15";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -128,23 +128,23 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		 * i.e. degrees and numVertices will be -1, EdgeDirection will be OUT.
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),

+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
 				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
 
 		Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
-				new UpdateFunctionDefault(), new MessageFunctionDefault(), 5);
+				new MessageFunctionDefault(), new UpdateFunctionDefault(), 5);
+
 
-		
 		DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long,
Long>());
 		List<Tuple2<Long, Long>> result= data.collect();
-        
+
 		expectedResult = "1,6\n" +
 						"2,6\n" +
 						"3,6\n" +
 						"4,6\n" +
 						"5,6";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -162,7 +162,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				.mapVertices(new InitialiseHashSetMapper());
 
 		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-				.runScatterGatherIteration(new VertexUpdateDirection(), new IdMessengerTrg(), 5)
+				.runScatterGatherIteration(new IdMessengerTrg(), new VertexUpdateDirection(), 5)
 				.getVertices();
 
         List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
@@ -172,7 +172,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,[1, 2]\n" +
 				"4,[3]\n" +
 				"5,[3, 4]";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -195,7 +195,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		parameters.setDirection(EdgeDirection.IN);
 
 		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-				.runScatterGatherIteration(new VertexUpdateDirection(), new IdMessengerSrc(), 5, parameters)
+				.runScatterGatherIteration(new IdMessengerSrc(), new VertexUpdateDirection(), 5, parameters)
 				.getVertices();
 
         List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
@@ -205,7 +205,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,[4, 5]\n" +
 				"4,[5]\n" +
 				"5,[1]";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -228,7 +228,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		parameters.setDirection(EdgeDirection.ALL);
 
 		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-				.runScatterGatherIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters)
+				.runScatterGatherIteration(new IdMessengerAll(), new VertexUpdateDirection(), 5, parameters)
 				.getVertices();
 
 		List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
@@ -238,7 +238,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,[1, 2, 4, 5]\n" +
 				"4,[3, 5]\n" +
 				"5,[1, 3, 4]";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -261,7 +261,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		parameters.setDirection(EdgeDirection.IN);
 
 		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-				.runScatterGatherIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+				.runScatterGatherIteration(new SendMsgToAll(), new VertexUpdateDirection(), 5, parameters)
 				.getVertices();
 
 		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
@@ -271,7 +271,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,[4, 5]\n" +
 				"4,[5]\n" +
 				"5,[1]";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -294,7 +294,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		parameters.setDirection(EdgeDirection.OUT);
 
 		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-				.runScatterGatherIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+				.runScatterGatherIteration(new SendMsgToAll(), new VertexUpdateDirection(), 5, parameters)
 				.getVertices();
 
 		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
@@ -304,7 +304,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,[1, 2]\n" +
 				"4,[3]\n" +
 				"5,[3, 4]";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -327,7 +327,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		parameters.setDirection(EdgeDirection.ALL);
 
 		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
-				.runScatterGatherIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+				.runScatterGatherIteration(new SendMsgToAll(), new VertexUpdateDirection(), 5, parameters)
 				.getVertices();
 
 		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
@@ -337,7 +337,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,[1, 2, 4, 5]\n" +
 				"4,[3, 5]\n" +
 				"5,[1, 3, 4]";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -353,8 +353,8 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
 				TestGraphUtils.getLongLongEdges(), env);
 
-		DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runScatterGatherIteration(new
UpdateFunctionNumVertices(),
-				new DummyMessageFunction(), 2).getVertices();
+		DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runScatterGatherIteration(new
DummyMessageFunction(),
+				new UpdateFunctionNumVertices(), 2).getVertices();
 
 		List<Vertex<Long, Long>> result= verticesWithNumVertices.collect();
 
@@ -363,7 +363,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,-1\n" +
 				"4,-1\n" +
 				"5,-1";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -371,9 +371,9 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	public void testInDegreesSet() throws Exception {
 
 		/*
-		 * Test that if the degrees are set, they can be accessed in every superstep 
+		 * Test that if the degrees are set, they can be accessed in every superstep
 		 * inside the update function and the value
-		 * is correctly computed for degrees in the messaging function.
+		 * is correctly computed for degrees in the scatter function.
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -386,7 +386,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		parameters.setOptDegrees(true);
 
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runScatterGatherIteration(
-				new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
+				new DegreesMessageFunction(), new UpdateFunctionInDegrees(), 5, parameters).getVertices();
 
 		List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
@@ -395,7 +395,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,2\n" +
 				"4,1\n" +
 				"5,2";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -411,7 +411,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				TestGraphUtils.getLongLongEdges(), env);
 
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runScatterGatherIteration(
-				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
+				new DummyMessageFunction(), new UpdateFunctionInDegrees(), 2).getVertices();
 
 		List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
@@ -420,7 +420,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,-1\n" +
 				"4,-1\n" +
 				"5,-1";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -430,7 +430,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		/*
 		 * Test that if the degrees are set, they can be accessed in every superstep
 		 * inside the update function and the value
-		 * is correctly computed for degrees in the messaging function.
+		 * is correctly computed for degrees in the scatter function.
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -443,7 +443,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		parameters.setOptDegrees(true);
 
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runScatterGatherIteration(
-				new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
+				new DegreesMessageFunction(), new UpdateFunctionOutDegrees(), 5, parameters).getVertices();
 
 		List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
@@ -452,7 +452,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,2\n" +
 				"4,1\n" +
 				"5,1";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -468,7 +468,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				TestGraphUtils.getLongLongEdges(), env);
 
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runScatterGatherIteration(
-				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
+				new DummyMessageFunction(), new UpdateFunctionInDegrees(), 2).getVertices();
 
 		List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
@@ -477,7 +477,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,-1\n" +
 				"4,-1\n" +
 				"5,-1";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -500,7 +500,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 		parameters.setDirection(EdgeDirection.ALL);
 
 		DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = graph.runScatterGatherIteration(
-				new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices();
+				new IdMessenger(), new VertexUpdateNumNeighbors(), 1, parameters).getVertices();
 
 		List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect();
 
@@ -509,107 +509,107 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 				"3,true\n" +
 				"4,true\n" +
 				"5,true";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long>
{
-
-		LongSumAggregator aggregator = new LongSumAggregator();
+	private static final class MessageFunction extends ScatterFunction<Long, Long, Long,
Long> {
 
 		@Override
 		public void preSuperstep() {
-			
+
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
-			Assert.assertEquals(1, bcastSet.get(0));
-			Assert.assertEquals(2, bcastSet.get(1));
-			Assert.assertEquals(3, bcastSet.get(2));
-			
-			// test aggregator
-			aggregator = getIterationAggregator("superstepAggregator");
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
+			Assert.assertEquals(4, bcastSet.get(0));
+			Assert.assertEquals(5, bcastSet.get(1));
+			Assert.assertEquals(6, bcastSet.get(2));
 
 			// test number of vertices
 			Assert.assertEquals(5, getNumberOfVertices());
-			
+
+			// test aggregator
+			if (getSuperstepNumber() == 2) {
+				long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+				Assert.assertEquals(5, aggrValue);
+			}
 		}
 
 		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
-			long superstep = getSuperstepNumber();
-			aggregator.aggregate(superstep);
-
-			setNewVertexValue(vertex.getValue() + 1);
+		public void sendMessages(Vertex<Long, Long> vertex) {
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long,
Long> {
+	private static final class MessageFunctionDefault extends ScatterFunction<Long, Long,
Long, Long> {
 
 		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
-
+		public void sendMessages(Vertex<Long, Long> vertex) {
 			// test number of vertices
 			Assert.assertEquals(-1, getNumberOfVertices());
 
 			// test degrees
 			Assert.assertEquals(-1, getInDegree());
 			Assert.assertEquals(-1, getOutDegree());
-
-			setNewVertexValue(vertex.getValue() + 1);
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
-	
+
 	@SuppressWarnings("serial")
-	public static final class MessageFunction extends MessagingFunction<Long, Long, Long,
Long> {
+	private static final class UpdateFunction extends GatherFunction<Long, Long, Long>
{
+
+		LongSumAggregator aggregator = new LongSumAggregator();
 
 		@Override
 		public void preSuperstep() {
-			
+
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
-			Assert.assertEquals(4, bcastSet.get(0));
-			Assert.assertEquals(5, bcastSet.get(1));
-			Assert.assertEquals(6, bcastSet.get(2));
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
+			Assert.assertEquals(1, bcastSet.get(0));
+			Assert.assertEquals(2, bcastSet.get(1));
+			Assert.assertEquals(3, bcastSet.get(2));
+
+			// test aggregator
+			aggregator = getIterationAggregator("superstepAggregator");
 
 			// test number of vertices
 			Assert.assertEquals(5, getNumberOfVertices());
-			
-			// test aggregator
-			if (getSuperstepNumber() == 2) {
-				long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
-				Assert.assertEquals(5, aggrValue);
-			}
+
 		}
 
 		@Override
-		public void sendMessages(Vertex<Long, Long> vertex) {
-			//send message to keep vertices active
-			sendMessageToAllNeighbors(vertex.getValue());
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
+			long superstep = getSuperstepNumber();
+			aggregator.aggregate(superstep);
+
+			setNewVertexValue(vertex.getValue() + 1);
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static final class MessageFunctionDefault extends MessagingFunction<Long, Long,
Long, Long> {
+	private static final class UpdateFunctionDefault extends GatherFunction<Long, Long, Long>
{
 
 		@Override
-		public void sendMessages(Vertex<Long, Long> vertex) {
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
+
 			// test number of vertices
 			Assert.assertEquals(-1, getNumberOfVertices());
 
 			// test degrees
 			Assert.assertEquals(-1, getInDegree());
 			Assert.assertEquals(-1, getOutDegree());
-			//send message to keep vertices active
-			sendMessageToAllNeighbors(vertex.getValue());
+
+			setNewVertexValue(vertex.getValue() + 1);
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long,
Long, Long> {
+	private static final class UpdateFunctionNumVertices extends GatherFunction<Long, Long,
Long> {
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
@@ -618,17 +618,16 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long,
Long> {
+	private static final class DummyUpdateFunction extends GatherFunction<Long, Long, Long>
{
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
 			setNewVertexValue(vertex.getValue() + 1);
 		}
 	}
-	
-	@SuppressWarnings("serial")
-	public static final class DummyMessageFunction extends MessagingFunction<Long, Long,
Long, Long> {
 
+	@SuppressWarnings("serial")
+	private static final class DummyMessageFunction extends ScatterFunction<Long, Long, Long,
Long> {
 		@Override
 		public void sendMessages(Vertex<Long, Long> vertex) {
 			//send message to keep vertices active
@@ -637,7 +636,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class DegreesMessageFunction extends MessagingFunction<Long, Long,
Long, Long> {
+	private static final class DegreesMessageFunction extends ScatterFunction<Long, Long,
Long, Long> {
 
 		@Override
 		public void sendMessages(Vertex<Long, Long> vertex) {
@@ -655,7 +654,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>,
Long> {
+	private static final class VertexUpdateDirection extends GatherFunction<Long, HashSet<Long>,
Long> {
 
 		@Override
 		public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long>
messages) throws Exception {
@@ -670,7 +669,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFunctionInDegrees extends VertexUpdateFunction<Long,
Long, Long> {
+	private static final class UpdateFunctionInDegrees extends GatherFunction<Long, Long,
Long> {
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
@@ -680,7 +679,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFunctionOutDegrees extends VertexUpdateFunction<Long,
Long, Long> {
+	private static final class UpdateFunctionOutDegrees extends GatherFunction<Long, Long,
Long> {
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
@@ -690,7 +689,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction<Long,
Boolean,
+	private static final class VertexUpdateNumNeighbors extends GatherFunction<Long, Boolean,
 			Long> {
 
 		@Override
@@ -706,7 +705,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Long,
Long> {
+	private static final class UpdateFunctionDegrees extends GatherFunction<Long, Long, Long>
{
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages)
{
@@ -717,7 +716,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class IdMessengerSrc extends MessagingFunction<Long, HashSet<Long>,
Long, Long> {
+	private static final class IdMessengerSrc extends ScatterFunction<Long, HashSet<Long>,
Long, Long> {
 
 		@Override
 		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception
{
@@ -728,7 +727,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class IdMessengerAll extends MessagingFunction<Long, HashSet<Long>,
Long, Long> {
+	private static final class IdMessengerAll extends ScatterFunction<Long, HashSet<Long>,
Long, Long> {
 
 		@Override
 		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception
{
@@ -743,7 +742,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class SendMsgToAll extends MessagingFunction<Long, HashSet<Long>,
Long, Long> {
+	private static final class SendMsgToAll extends ScatterFunction<Long, HashSet<Long>,
Long, Long> {
 
 		@Override
 		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception
{
@@ -752,7 +751,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class IdMessenger extends MessagingFunction<Long, Boolean, Long,
Long> {
+	private static final class IdMessenger extends ScatterFunction<Long, Boolean, Long, Long>
{
 
 		@Override
 		public void sendMessages(Vertex<Long, Boolean> vertex) throws Exception {
@@ -767,7 +766,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class IdMessengerTrg extends MessagingFunction<Long, HashSet<Long>,
Long, Long> {
+	private static final class IdMessengerTrg extends ScatterFunction<Long, HashSet<Long>,
Long, Long> {
 
 		@Override
 		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception
{
@@ -778,7 +777,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>,
Long> {
+	private static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>,
Long> {
 
 		public Long map(Vertex<Long, Long> value) {
 			return 1L;
@@ -786,7 +785,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long,
Long>, HashSet<Long>> {
+	private static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long,
Long>, HashSet<Long>> {
 
 		@Override
 		public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {


Mime
View raw message