flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [2/4] flink git commit: [FLINK-3618] [gelly] Rename abstract UDF classes in Scatter-Gather implementation
Date Thu, 30 Jun 2016 18:57:50 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 29183e9..4ff4e79 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -24,9 +24,9 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.spargel.ScatterFunction;
 
 /**
  * This is an implementation of the Single-Source-Shortest Paths algorithm, using a scatter-gather iteration.
@@ -52,7 +52,7 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
 	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
 		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
-				.runScatterGatherIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+				.runScatterGatherIteration(new MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(),
 				maxIterations).getVertices();
 	}
 
@@ -74,12 +74,30 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
 	}
 
 	/**
+	 * Distributes the minimum distance associated with a given vertex among all
+	 * the target vertices summed up with the edge's value.
+	 *
+	 * @param <K>
+	 */
+	public static final class MinDistanceMessenger<K> extends ScatterFunction<K, Double, Double, Double> {
+
+		@Override
+		public void sendMessages(Vertex<K, Double> vertex) {
+			if (vertex.getValue() < Double.POSITIVE_INFINITY) {
+				for (Edge<K, Double> edge : getEdges()) {
+					sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+				}
+			}
+		}
+	}
+
+	/**
 	 * Function that updates the value of a vertex by picking the minimum
 	 * distance from all incoming messages.
 	 * 
 	 * @param <K>
 	 */
-	public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+	public static final class VertexDistanceUpdater<K> extends GatherFunction<K, Double, Double> {
 
 		@Override
 		public void updateVertex(Vertex<K, Double> vertex,
@@ -98,22 +116,4 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
 			}
 		}
 	}
-
-	/**
-	 * Distributes the minimum distance associated with a given vertex among all
-	 * the target vertices summed up with the edge's value.
-	 * 
-	 * @param <K>
-	 */
-	public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
-		@Override
-		public void sendMessages(Vertex<K, Double> vertex) {
-			if (vertex.getValue() < Double.POSITIVE_INFINITY) {
-				for (Edge<K, Double> edge : getEdges()) {
-					sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
-				}
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 8272d8f..681d060 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.graph.library;
 
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
new file mode 100644
index 0000000..d56c0da
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * This class must be extended by functions that compute the state of the vertex depending on the old state and the
+ * incoming messages. The central method is {@link #updateVertex(Vertex, MessageIterator)}, which is
+ * invoked once per vertex per superstep.
+ * 
+ * {@code <K>} The vertex key type.
+ * {@code <VV>} The vertex value type.
+ * {@code <Message>} The message type.
+ */
+public abstract class GatherFunction<K, VV, Message> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	// --------------------------------------------------------------------------------------------
+	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
+	//  inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+
+	private boolean optDegrees;
+
+	boolean isOptDegrees() {
+		return optDegrees;
+	}
+
+	void setOptDegrees(boolean optDegrees) {
+		this.optDegrees = optDegrees;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
+	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+	 * state is changed, it will trigger the sending of messages via the {@link ScatterFunction}.
+	 * 
+	 * @param vertex The vertex.
+	 * @param inMessages The incoming messages to this vertex.
+	 * 
+	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+	 */
+	public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
+
+	/**
+	 * This method is executed once per superstep before the gather function is invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() throws Exception {}
+
+	/**
+	 * This method is executed once per superstep after the gather function has been invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() throws Exception {}
+
+	/**
+	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+	 *
+	 * This should be called at most once per updateVertex.
+	 * 
+	 * @param newValue The new vertex value.
+	 */
+	public void setNewVertexValue(VV newValue) {
+		if(setNewVertexValueCalled) {
+			throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
+		}
+		setNewVertexValueCalled = true;
+		if(isOptDegrees()) {
+			outValWithDegrees.f1.f0 = newValue;
+			outWithDegrees.collect(outValWithDegrees);
+		} else {
+			outVal.setValue(newValue);
+			out.collect(outVal);
+		}
+	}
+
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 * 
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function. They can be registered via
+	 * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForGatherFunction(String, org.apache.flink.api.java.DataSet)}.
+	 * 
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  internal methods
+	// --------------------------------------------------------------------------------------------
+
+	private IterationRuntimeContext runtimeContext;
+
+	private Collector<Vertex<K, VV>> out;
+
+	private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
+
+	private Vertex<K, VV> outVal;
+
+	private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
+
+	private long inDegree = -1;
+
+	private long outDegree = -1;
+
+	private boolean setNewVertexValueCalled;
+
+	void init(IterationRuntimeContext context) {
+		this.runtimeContext = context;
+	}
+
+	void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
+		this.outVal = outVal;
+		this.out = out;
+		setNewVertexValueCalled = false;
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	<ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
+			Collector out) {
+		this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
+		this.outWithDegrees = out;
+		setNewVertexValueCalled = false;
+	}
+
+	/**
+	 * Retrieves the vertex in-degree (number of in-coming edges).
+	 * @return The in-degree of this vertex
+	 */
+	public long getInDegree() {
+		return inDegree;
+	}
+
+	void setInDegree(long inDegree) {
+		this.inDegree = inDegree;
+	}
+
+	/**
+	 * Retrieve the vertex out-degree (number of out-going edges).
+	 * @return The out-degree of this vertex
+	 */
+	public long getOutDegree() {
+		return outDegree;
+	}
+
+	void setOutDegree(long outDegree) {
+		this.outDegree = outDegree;
+	}
+
+	/**
+	 * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
+	 * another function will be called from {@link org.apache.flink.graph.spargel.ScatterGatherIteration}.
+	 *
+	 * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+	 * the regular updateVertex function.
+	 *
+	 * @param vertexState
+	 * @param inMessages
+	 * @throws Exception
+	 */
+	@SuppressWarnings("unchecked")
+	<VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
+												MessageIterator<Message> inMessages) throws Exception {
+
+		Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
+				((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
+
+		updateVertex(vertex, inMessages);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
deleted file mode 100644
index e12d779..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}.
- * 
- * @param <K> The type of the vertex key (the vertex identifier).
- * @param <VV> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EV> The type of the values that are associated with the edges.
- */
-public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	// --------------------------------------------------------------------------------------------
-	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
-	//  inside an iteration.
-	// --------------------------------------------------------------------------------------------
-
-	private long numberOfVertices = -1L;
-
-	/**
-	 * Retrieves the number of vertices in the graph.
-	 * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
-	 * option has been set; -1 otherwise.
-	 */
-	public long getNumberOfVertices() {
-		return numberOfVertices;
-	}
-
-	void setNumberOfVertices(long numberOfVertices) {
-		this.numberOfVertices = numberOfVertices;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
-	//  the scatter gather iteration.
-	// --------------------------------------------------------------------------------------------
-
-	private EdgeDirection direction;
-
-	/**
-	 * Retrieves the edge direction in which messages are propagated in the scatter-gather iteration.
-	 * @return the messaging {@link EdgeDirection}
-	 */
-	public EdgeDirection getDirection() {
-		return direction;
-	}
-
-	void setDirection(EdgeDirection direction) {
-		this.direction = direction;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
-	 * It needs to produce the messages that will be received by vertices in the next superstep.
-	 * 
-	 * @param vertex The vertex that was changed.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
-	
-	/**
-	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() throws Exception {}
-	
-	
-	/**
-	 * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
-	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
-	 * <p>
-	 * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges.
-	 * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges.
-	 * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges.
-	 * 
-	 * @return An iterator with all edges.
-	 */
-	@SuppressWarnings("unchecked")
-	public Iterable<Edge<K, EV>> getEdges() {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
-		}
-		edgesUsed = true;
-		this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
-		return this.edgeIterator;
-	}
-
-	/**
-	 * Sends the given message to all vertices that are targets of an edge of the changed vertex.
-	 * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
-	 * <p>
-	 * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors.
-	 * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors.
-	 * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors.
-	 * 
-	 * @param m The message to send.
-	 */
-	public void sendMessageToAllNeighbors(Message m) {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'"
-					+ "exactly once.");
-		}
-		
-		edgesUsed = true;
-		outValue.f1 = m;
-		
-		while (edges.hasNext()) {
-			Tuple next = (Tuple) edges.next();
-
-			/*
-			 * When EdgeDirection is OUT, the edges iterator only has the out-edges 
-			 * of the vertex, i.e. the ones where this vertex is src. 
-			 * next.getField(1) gives the neighbor of the vertex running this MessagingFunction.
-			 */
-			if (getDirection().equals(EdgeDirection.OUT)) {
-				outValue.f0 = next.getField(1);
-			}
-			/*
-			 * When EdgeDirection is IN, the edges iterator only has the in-edges 
-			 * of the vertex, i.e. the ones where this vertex is trg. 
-			 * next.getField(10) gives the neighbor of the vertex running this MessagingFunction.
-			 */
-			else if (getDirection().equals(EdgeDirection.IN)) {
-				outValue.f0 = next.getField(0);
-			}
-			 // When EdgeDirection is ALL, the edges iterator contains both in- and out- edges
-			if (getDirection().equals(EdgeDirection.ALL)) {
-				if (next.getField(0).equals(vertexId)) {
-					// send msg to the trg
-					outValue.f0 = next.getField(1);
-				}
-				else {
-					// send msg to the src
-					outValue.f0 = next.getField(0);
-				}
-			}
-			out.collect(outValue);
-		}
-	}
-	
-	/**
-	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
-	 * the next superstep will cause an exception due to a non-deliverable message.
-	 * 
-	 * @param target The key (id) of the target vertex to message.
-	 * @param m The message.
-	 */
-	public void sendMessageTo(K target, Message m) {
-		outValue.f0 = target;
-		outValue.f1 = m;
-		out.collect(outValue);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  internal methods and state
-	// --------------------------------------------------------------------------------------------
-	
-	private Tuple2<K, Message> outValue;
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Iterator<?> edges;
-	
-	private Collector<Tuple2<K, Message>> out;
-
-	private K vertexId;
-	
-	private EdgesIterator<K, EV> edgeIterator;
-	
-	private boolean edgesUsed;
-
-	private long inDegree = -1;
-
-	private long outDegree = -1;
-	
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-		this.outValue = new Tuple2<K, Message>();
-		this.edgeIterator = new EdgesIterator<K, EV>();
-	}
-	
-	void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
-		this.edges = edges;
-		this.out = out;
-		this.vertexId = id;
-		this.edgesUsed = false;
-	}
-	
-	private static final class EdgesIterator<K, EV> 
-		implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
-	{
-		private Iterator<Edge<K, EV>> input;
-		
-		private Edge<K, EV> edge = new Edge<K, EV>();
-		
-		void set(Iterator<Edge<K, EV>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public Edge<K, EV> next() {
-			Edge<K, EV> next = input.next();
-			edge.setSource(next.f0);
-			edge.setTarget(next.f1);
-			edge.setValue(next.f2);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-		@Override
-		public Iterator<Edge<K, EV>> iterator() {
-			return this;
-		}
-	}
-
-	/**
-	 * Retrieves the vertex in-degree (number of in-coming edges).
-	 * @return The in-degree of this vertex
-	 */
-	public long getInDegree() {
-		return inDegree;
-	}
-
-	void setInDegree(long inDegree) {
-		this.inDegree = inDegree;
-	}
-
-	/**
-	 * Retrieve the vertex out-degree (number of out-going edges).
-	 * @return The out-degree of this vertex
-	 */
-	public long getOutDegree() {
-		return outDegree;
-	}
-
-	void setOutDegree(long outDegree) {
-		this.outDegree = outDegree;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
new file mode 100644
index 0000000..336e73d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}.
+ * 
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public abstract class ScatterFunction<K, VV, Message, EV> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	// --------------------------------------------------------------------------------------------
+	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
+	//  inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
+	//  the scatter gather iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private EdgeDirection direction;
+
+	/**
+	 * Retrieves the edge direction in which messages are propagated in the scatter-gather iteration.
+	 * @return the messaging {@link EdgeDirection}
+	 */
+	public EdgeDirection getDirection() {
+		return direction;
+	}
+
+	void setDirection(EdgeDirection direction) {
+		this.direction = direction;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
+	 * It needs to produce the messages that will be received by vertices in the next superstep.
+	 * 
+	 * @param vertex The vertex that was changed.
+	 * 
+	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+	 */
+	public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
+
+	/**
+	 * This method is executed once per superstep before the scatter function is invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() throws Exception {}
+
+	/**
+	 * This method is executed once per superstep after the scatter function has been invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() throws Exception {}
+
+
+	/**
+	 * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
+	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
+	 * <p>
+	 * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges.
+	 * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges.
+	 * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges.
+	 * 
+	 * @return An iterator with all edges.
+	 */
+	@SuppressWarnings("unchecked")
+	public Iterable<Edge<K, EV>> getEdges() {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()' exactly once.");
+		}
+		edgesUsed = true;
+		this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+		return this.edgeIterator;
+	}
+
+	/**
+	 * Sends the given message to all vertices that are targets of an edge of the changed vertex.
+	 * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
+	 * <p>
+	 * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors.
+	 * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors.
+	 * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors.
+	 * 
+	 * @param m The message to send.
+	 */
+	public void sendMessageToAllNeighbors(Message m) {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'"
+					+ "exactly once.");
+		}
+
+		edgesUsed = true;
+		outValue.f1 = m;
+
+		while (edges.hasNext()) {
+			Tuple next = (Tuple) edges.next();
+
+			/*
+			 * When EdgeDirection is OUT, the edges iterator only has the out-edges 
+			 * of the vertex, i.e. the ones where this vertex is src. 
+			 * next.getField(1) gives the neighbor of the vertex running this ScatterFunction.
+			 */
+			if (getDirection().equals(EdgeDirection.OUT)) {
+				outValue.f0 = next.getField(1);
+			}
+			/*
+			 * When EdgeDirection is IN, the edges iterator only has the in-edges 
+			 * of the vertex, i.e. the ones where this vertex is trg. 
+			 * next.getField(10) gives the neighbor of the vertex running this ScatterFunction.
+			 */
+			else if (getDirection().equals(EdgeDirection.IN)) {
+				outValue.f0 = next.getField(0);
+			}
+			 // When EdgeDirection is ALL, the edges iterator contains both in- and out- edges
+			if (getDirection().equals(EdgeDirection.ALL)) {
+				if (next.getField(0).equals(vertexId)) {
+					// send msg to the trg
+					outValue.f0 = next.getField(1);
+				}
+				else {
+					// send msg to the src
+					outValue.f0 = next.getField(0);
+				}
+			}
+			out.collect(outValue);
+		}
+	}
+
+	/**
+	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
+	 * the next superstep will cause an exception due to a non-deliverable message.
+	 * 
+	 * @param target The key (id) of the target vertex to message.
+	 * @param m The message.
+	 */
+	public void sendMessageTo(K target, Message m) {
+		outValue.f0 = target;
+		outValue.f1 = m;
+		out.collect(outValue);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 * 
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function. They can be registered via
+	 * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForScatterFunction(String, org.apache.flink.api.java.DataSet)}.
+	 * 
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  internal methods and state
+	// --------------------------------------------------------------------------------------------
+
+	private Tuple2<K, Message> outValue;
+
+	private IterationRuntimeContext runtimeContext;
+
+	private Iterator<?> edges;
+
+	private Collector<Tuple2<K, Message>> out;
+
+	private K vertexId;
+
+	private EdgesIterator<K, EV> edgeIterator;
+
+	private boolean edgesUsed;
+
+	private long inDegree = -1;
+
+	private long outDegree = -1;
+
+	void init(IterationRuntimeContext context) {
+		this.runtimeContext = context;
+		this.outValue = new Tuple2<K, Message>();
+		this.edgeIterator = new EdgesIterator<K, EV>();
+	}
+
+	void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
+		this.edges = edges;
+		this.out = out;
+		this.vertexId = id;
+		this.edgesUsed = false;
+	}
+
+	private static final class EdgesIterator<K, EV> 
+		implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
+	{
+		private Iterator<Edge<K, EV>> input;
+
+		private Edge<K, EV> edge = new Edge<K, EV>();
+
+		void set(Iterator<Edge<K, EV>> input) {
+			this.input = input;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return input.hasNext();
+		}
+
+		@Override
+		public Edge<K, EV> next() {
+			Edge<K, EV> next = input.next();
+			edge.setSource(next.f0);
+			edge.setTarget(next.f1);
+			edge.setValue(next.f2);
+			return edge;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+		@Override
+		public Iterator<Edge<K, EV>> iterator() {
+			return this;
+		}
+	}
+
+	/**
+	 * Retrieves the vertex in-degree (number of in-coming edges).
+	 * @return The in-degree of this vertex
+	 */
+	public long getInDegree() {
+		return inDegree;
+	}
+
+	void setInDegree(long inDegree) {
+		this.inDegree = inDegree;
+	}
+
+	/**
+	 * Retrieve the vertex out-degree (number of out-going edges).
+	 * @return The out-degree of this vertex
+	 */
+	public long getOutDegree() {
+		return outDegree;
+	}
+
+	void setOutDegree(long outDegree) {
+		this.outDegree = outDegree;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
index 3a3de64..4ac1ae1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
@@ -29,20 +29,20 @@ import java.util.List;
 /**
  * A ScatterGatherConfiguration object can be used to set the iteration name and
  * degree of parallelism, to register aggregators and use broadcast sets in
- * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
+ * the {@link GatherFunction} and {@link ScatterFunction}
  *
  * The VertexCentricConfiguration object is passed as an argument to
  * {@link org.apache.flink.graph.Graph#runScatterGatherIteration (
- * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
+ * org.apache.flink.graph.spargel.GatherFunction, org.apache.flink.graph.spargel.ScatterFunction, int,
  * ScatterGatherConfiguration)}.
  */
 public class ScatterGatherConfiguration extends IterationConfiguration {
 
-	/** the broadcast variables for the update function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
+	/** the broadcast variables for the scatter function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsScatter = new ArrayList<>();
 
-	/** the broadcast variables for the messaging function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+	/** the broadcast variables for the gather function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<>();
 
 	/** flag that defines whether the degrees option is set **/
 	private boolean optDegrees = false;
@@ -53,43 +53,43 @@ public class ScatterGatherConfiguration extends IterationConfiguration {
 	public ScatterGatherConfiguration() {}
 
 	/**
-	 * Adds a data set as a broadcast set to the messaging function.
+	 * Adds a data set as a broadcast set to the scatter function.
 	 *
-	 * @param name The name under which the broadcast data is available in the messaging function.
+	 * @param name The name under which the broadcast data is available in the scatter function.
 	 * @param data The data set to be broadcasted.
 	 */
-	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
-		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+	public void addBroadcastSetForScatterFunction(String name, DataSet<?> data) {
+		this.bcVarsScatter.add(new Tuple2<String, DataSet<?>>(name, data));
 	}
 
 	/**
-	 * Adds a data set as a broadcast set to the vertex update function.
+	 * Adds a data set as a broadcast set to the gather function.
 	 *
-	 * @param name The name under which the broadcast data is available in the vertex update function.
+	 * @param name The name under which the broadcast data is available in the gather function.
 	 * @param data The data set to be broadcasted.
 	 */
-	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
-		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+	public void addBroadcastSetForGatherFunction(String name, DataSet<?> data) {
+		this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, data));
 	}
 
 	/**
-	 * Get the broadcast variables of the VertexUpdateFunction.
+	 * Get the broadcast variables of the ScatterFunction.
 	 *
 	 * @return a List of Tuple2, where the first field is the broadcast variable name
 	 * and the second field is the broadcast DataSet.
 	 */
-	public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
-		return this.bcVarsUpdate;
+	public List<Tuple2<String, DataSet<?>>> getScatterBcastVars() {
+		return this.bcVarsScatter;
 	}
 
 	/**
-	 * Get the broadcast variables of the MessagingFunction.
+	 * Get the broadcast variables of the GatherFunction.
 	 *
 	 * @return a List of Tuple2, where the first field is the broadcast variable name
 	 * and the second field is the broadcast DataSet.
 	 */
-	public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
-		return this.bcVarsMessaging;
+	public List<Tuple2<String, DataSet<?>>> getGatherBcastVars() {
+		return this.bcVarsGather;
 	}
 
 	/**
@@ -113,7 +113,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration {
 	}
 
 	/**
-	 * Gets the direction in which messages are sent in the MessagingFunction.
+	 * Gets the direction in which messages are sent in the ScatterFunction.
 	 * By default the messaging direction is OUT.
 	 *
 	 * @return an EdgeDirection, which can be either IN, OUT or ALL.
@@ -123,7 +123,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration {
 	}
 
 	/**
-	 * Sets the direction in which messages are sent in the MessagingFunction.
+	 * Sets the direction in which messages are sent in the ScatterFunction.
 	 * By default the messaging direction is OUT.
 	 *
 	 * @param direction - IN, OUT or ALL

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index fc5c210..fde305f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -53,21 +53,21 @@ import java.util.Map;
  * Scatter-Gather algorithms operate on graphs, which are defined through vertices and edges. The 
  * algorithms send messages along the edges and update the state of vertices based on
  * the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
+ * The computation terminates once no vertex updates its state any more.
  * Additionally, a maximum number of iterations (supersteps) may be specified.
  * <p>
  * The computation is here represented by two functions:
  * <ul>
- *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
+ *   <li>The {@link GatherFunction} receives incoming messages and may updates the state for
  *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
  *   considered updated.</li>
- *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
+ *   <li>The {@link ScatterFunction} takes the new vertex state and sends messages along the outgoing
  *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
  * </ul>
  * <p>
  *
  * Scatter-Gather graph iterations are are run by calling
- * {@link Graph#runScatterGatherIteration(VertexUpdateFunction, MessagingFunction, int)}.
+ * {@link Graph#runScatterGatherIteration(ScatterFunction, GatherFunction, int)}.
  *
  * @param <K> The type of the vertex key (the vertex identifier).
  * @param <VV> The type of the vertex value (the state of the vertex).
@@ -77,47 +77,47 @@ import java.util.Map;
 public class ScatterGatherIteration<K, VV, Message, EV> 
 	implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
 {
-	private final VertexUpdateFunction<K, VV, Message> updateFunction;
+	private final ScatterFunction<K, VV, Message, EV> scatterFunction;
+
+	private final GatherFunction<K, VV, Message> gatherFunction;
 
-	private final MessagingFunction<K, VV, Message, EV> messagingFunction;
-	
 	private final DataSet<Edge<K, EV>> edgesWithValue;
-	
+
 	private final int maximumNumberOfIterations;
-	
+
 	private final TypeInformation<Message> messageType;
-	
+
 	private DataSet<Vertex<K, VV>> initialVertices;
 
 	private ScatterGatherConfiguration configuration;
 
 	// ----------------------------------------------------------------------------------
-	
-	private ScatterGatherIteration(VertexUpdateFunction<K, VV, Message> uf,
-			MessagingFunction<K, VV, Message, EV> mf,
+
+	private ScatterGatherIteration(ScatterFunction<K, VV, Message, EV> sf,
+			GatherFunction<K, VV, Message> gf,
 			DataSet<Edge<K, EV>> edgesWithValue, 
 			int maximumNumberOfIterations)
 	{
-		Preconditions.checkNotNull(uf);
-		Preconditions.checkNotNull(mf);
+		Preconditions.checkNotNull(sf);
+		Preconditions.checkNotNull(gf);
 		Preconditions.checkNotNull(edgesWithValue);
 		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
 
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
+		this.scatterFunction = sf;
+		this.gatherFunction = gf;
 		this.edgesWithValue = edgesWithValue;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;		
-		this.messageType = getMessageType(mf);
+		this.maximumNumberOfIterations = maximumNumberOfIterations;
+		this.messageType = getMessageType(sf);
 	}
-	
-	private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
-		return TypeExtractor.createTypeInfo(mf, MessagingFunction.class, mf.getClass(), 2);
+
+	private TypeInformation<Message> getMessageType(ScatterFunction<K, VV, Message, EV> mf) {
+		return TypeExtractor.createTypeInfo(mf, ScatterFunction.class, mf.getClass(), 2);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Custom Operator behavior
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Sets the input data set for this operator. In the case of this operator this input data set represents
 	 * the set of vertices with their initial state.
@@ -131,7 +131,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	public void setInput(DataSet<Vertex<K, VV>> inputData) {
 		this.initialVertices = inputData;
 	}
-	
+
 	/**
 	 * Creates the operator that represents this scatter-gather graph computation.
 	 * 
@@ -145,14 +145,14 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		// prepare some type information
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
-		TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
+		TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<>(keyType, messageType);
 
 		// create a graph
 		Graph<K, VV, EV> graph =
 				Graph.fromDataSet(initialVertices, edgesWithValue, initialVertices.getExecutionEnvironment());
 
 		// check whether the numVertices option is set and, if so, compute the total number of vertices
-		// and set it within the messaging and update functions
+		// and set it within the scatter and gather functions
 
 		DataSet<LongValue> numberOfVertices = null;
 		if (this.configuration != null && this.configuration.isOptNumVertices()) {
@@ -164,13 +164,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		}
 
 		if(this.configuration != null) {
-			messagingFunction.setDirection(this.configuration.getDirection());
+			scatterFunction.setDirection(this.configuration.getDirection());
 		} else {
-			messagingFunction.setDirection(EdgeDirection.OUT);
+			scatterFunction.setDirection(EdgeDirection.OUT);
 		}
 
 		// retrieve the direction in which the updates are made and in which the messages are sent
-		EdgeDirection messagingDirection = messagingFunction.getDirection();
+		EdgeDirection messagingDirection = scatterFunction.getDirection();
 
 		// check whether the degrees option is set and, if so, compute the in and the out degrees and
 		// add them to the vertex value
@@ -186,9 +186,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 * a weight or distance).
 	 * 
 	 * @param edgesWithValue The data set containing edges.
-	 * @param uf The function that updates the state of the vertices from the incoming messages.
-	 * @param mf The function that turns changed vertex states into messages along the edges.
-	 * 
+	 * @param sf The function that turns changed vertex states into messages along the edges.
+	 * @param gf The function that updates the state of the vertices from the incoming messages.
+	 *
 	 * @param <K> The type of the vertex key (the vertex identifier).
 	 * @param <VV> The type of the vertex value (the state of the vertex).
 	 * @param <Message> The type of the message sent between vertices along the edges.
@@ -196,14 +196,11 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 * 
 	 * @return An in stance of the scatter-gather graph computation operator.
 	 */
-	public static final <K, VV, Message, EV>
-			ScatterGatherIteration<K, VV, Message, EV> withEdges(
-					DataSet<Edge<K, EV>> edgesWithValue,
-					VertexUpdateFunction<K, VV, Message> uf,
-					MessagingFunction<K, VV, Message, EV> mf,
-					int maximumNumberOfIterations)
+	public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(
+		DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf,
+		GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations)
 	{
-		return new ScatterGatherIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
+		return new ScatterGatherIteration<>(sf, gf, edgesWithValue, maximumNumberOfIterations);
 	}
 
 	/**
@@ -226,23 +223,122 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	//  Wrapping UDFs
 	// --------------------------------------------------------------------------------------------
 
-	private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
+	/*
+	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
+	 */
+	private static abstract class ScatterUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
+			extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
+			implements ResultTypeQueryable<Tuple2<K, Message>>
+	{
+		private static final long serialVersionUID = 1L;
+
+		final ScatterFunction<K, VV, Message, EV> scatterFunction;
+
+		private transient TypeInformation<Tuple2<K, Message>> resultType;
+
+
+		private ScatterUdfWithEdgeValues(ScatterFunction<K, VV, Message, EV> scatterFunction,
+				TypeInformation<Tuple2<K, Message>> resultType)
+		{
+			this.scatterFunction = scatterFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+				Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+				this.scatterFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+			}
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.scatterFunction.init(getIterationRuntimeContext());
+			}
+			this.scatterFunction.preSuperstep();
+		}
+
+		@Override
+		public void close() throws Exception {
+			this.scatterFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<K, Message>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ScatterUdfWithEVsSimpleVV<K, VV, Message, EV>
+			extends ScatterUdfWithEdgeValues<K, VV, VV, Message, EV> {
+
+		private ScatterUdfWithEVsSimpleVV(ScatterFunction<K, VV, Message, EV> scatterFunction,
+				TypeInformation<Tuple2<K, Message>> resultType) {
+			super(scatterFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> edges,
+							Iterable<Vertex<K, VV>> state,
+							Collector<Tuple2<K, Message>> out) throws Exception {
+			final Iterator<Vertex<K, VV>> stateIter = state.iterator();
+
+			if (stateIter.hasNext()) {
+				Vertex<K, VV> newVertexState = stateIter.next();
+				scatterFunction.set(edges.iterator(), out, newVertexState.getId());
+				scatterFunction.sendMessages(newVertexState);
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ScatterUdfWithEVsVVWithDegrees<K, VV, Message, EV>
+			extends ScatterUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> {
+
+		private Vertex<K, VV> nextVertex = new Vertex<>();
+
+		private ScatterUdfWithEVsVVWithDegrees(ScatterFunction<K, VV, Message, EV> scatterFunction,
+				TypeInformation<Tuple2<K, Message>> resultType) {
+			super(scatterFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
+							Collector<Tuple2<K, Message>> out) throws Exception {
+
+			final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
+
+			if (stateIter.hasNext()) {
+				Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
+
+				nextVertex.setField(vertexWithDegrees.f0, 0);
+				nextVertex.setField(vertexWithDegrees.f1.f0, 1);
+
+				scatterFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+				scatterFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
+
+				scatterFunction.set(edges.iterator(), out, vertexWithDegrees.getId());
+				scatterFunction.sendMessages(nextVertex);
+			}
+		}
+	}
+
+	private static abstract class GatherUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
 		Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
 		implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
 	{
 		private static final long serialVersionUID = 1L;
-		
-		final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
 
-		final MessageIterator<Message> messageIter = new MessageIterator<Message>();
-		
+		final GatherFunction<K, VVWithDegrees, Message> gatherFunction;
+
+		final MessageIterator<Message> messageIter = new MessageIterator<>();
+
 		private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
-		
-		
-		private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
+
+
+		private GatherUdf(GatherFunction<K, VVWithDegrees, Message> gatherFunction,
 				TypeInformation<Vertex<K, VVWithDegrees>> resultType)
 		{
-			this.vertexUpdateFunction = vertexUpdateFunction;
+			this.gatherFunction = gatherFunction;
 			this.resultType = resultType;
 		}
 
@@ -250,17 +346,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		public void open(Configuration parameters) throws Exception {
 			if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
 				Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
-				this.vertexUpdateFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+				this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
 			}
 			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.vertexUpdateFunction.init(getIterationRuntimeContext());
+				this.gatherFunction.init(getIterationRuntimeContext());
 			}
-			this.vertexUpdateFunction.preSuperstep();
+			this.gatherFunction.preSuperstep();
 		}
-		
+
 		@Override
 		public void close() throws Exception {
-			this.vertexUpdateFunction.postSuperstep();
+			this.gatherFunction.postSuperstep();
 		}
 
 		@Override
@@ -270,10 +366,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	}
 
 	@SuppressWarnings("serial")
-	private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
+	private static final class GatherUdfSimpleVV<K, VV, Message> extends GatherUdf<K, VV, Message> {
 
-		private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
-			super(vertexUpdateFunction, resultType);
+		private GatherUdfSimpleVV(GatherFunction<K, VV, Message> gatherFunction, TypeInformation<Vertex<K, VV>> resultType) {
+			super(gatherFunction, resultType);
 		}
 
 		@Override
@@ -289,8 +385,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
 				messageIter.setSource(downcastIter);
 
-				vertexUpdateFunction.setOutput(vertexState, out);
-				vertexUpdateFunction.updateVertex(vertexState, messageIter);
+				gatherFunction.setOutput(vertexState, out);
+				gatherFunction.updateVertex(vertexState, messageIter);
 			}
 			else {
 				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
@@ -299,7 +395,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 					try {
 						Tuple2<K, Message> next = messageIter.next();
 						message = "Target vertex '" + next.f0 + "' does not exist!.";
-					} catch (Throwable t) {}
+					} catch (Throwable ignored) {}
 					throw new Exception(message);
 				} else {
 					throw new Exception();
@@ -309,31 +405,31 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	}
 
 	@SuppressWarnings("serial")
-	private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
+	private static final class GatherUdfVVWithDegrees<K, VV, Message> extends GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
 
-		private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, LongValue, LongValue>, Message> vertexUpdateFunction,
+		private GatherUdfVVWithDegrees(GatherFunction<K, Tuple3<VV, LongValue, LongValue>, Message> gatherFunction,
 				TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> resultType) {
-			super(vertexUpdateFunction, resultType);
+			super(gatherFunction, resultType);
 		}
-		
+
 		@Override
 		public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertex,
 							Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
 
 			final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexIter = vertex.iterator();
-		
+
 			if (vertexIter.hasNext()) {
 				Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = vertexIter.next();
-		
+
 				@SuppressWarnings("unchecked")
 				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
 				messageIter.setSource(downcastIter);
 
-				vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
-				vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
+				gatherFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+				gatherFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
 
-				vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
-				vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter);
+				gatherFunction.setOutputWithDegrees(vertexWithDegrees, out);
+				gatherFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter);
 			}
 			else {
 				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
@@ -342,7 +438,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 					try {
 						Tuple2<K, Message> next = messageIter.next();
 						message = "Target vertex '" + next.f0 + "' does not exist!.";
-					} catch (Throwable t) {}
+					} catch (Throwable ignored) {}
 					throw new Exception(message);
 				} else {
 					throw new Exception();
@@ -351,112 +447,12 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		}
 	}
 
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
-	 */
-	private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
-		extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
-		implements ResultTypeQueryable<Tuple2<K, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		final MessagingFunction<K, VV, Message, EV> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<K, Message>> resultType;
-	
-	
-		private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
-				TypeInformation<Tuple2<K, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
-				Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
-				this.messagingFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
-			}
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext());
-			}
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-		
-		@Override
-		public TypeInformation<Tuple2<K, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
-		extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
-
-		private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
-			TypeInformation<Tuple2<K, Message>> resultType) {
-			super(messagingFunction, resultType);
-		}
-
-		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edges,
-							Iterable<Vertex<K, VV>> state,
-							Collector<Tuple2<K, Message>> out) throws Exception {
-			final Iterator<Vertex<K, VV>> stateIter = state.iterator();
-		
-			if (stateIter.hasNext()) {
-				Vertex<K, VV> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out, newVertexState.getId());
-				messagingFunction.sendMessages(newVertexState);
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
-		extends MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> {
-
-		private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
-
-		private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
-				TypeInformation<Tuple2<K, Message>> resultType) {
-			super(messagingFunction, resultType);
-		}
-
-		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
-				Collector<Tuple2<K, Message>> out) throws Exception {
-
-			final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
-		
-			if (stateIter.hasNext()) {
-				Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
-
-				nextVertex.setField(vertexWithDegrees.f0, 0);
-				nextVertex.setField(vertexWithDegrees.f1.f0, 1);
-
-				messagingFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
-				messagingFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
-
-				messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId());
-				messagingFunction.sendMessages(nextVertex);
-			}
-		}
-	}
-
-
 	// --------------------------------------------------------------------------------------------
 	//  UTIL methods
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Method that builds the messaging function using a coGroup operator for a simple vertex(without
+	 * Method that builds the scatter function using a coGroup operator for a simple vertex (without
 	 * degrees).
 	 * It afterwards configures the function with a custom name and broadcast variables.
 	 *
@@ -464,17 +460,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 * @param messageTypeInfo
 	 * @param whereArg the argument for the where within the coGroup
 	 * @param equalToArg the argument for the equalTo within the coGroup
-	 * @return the messaging function
+	 * @return the scatter function
 	 */
-	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
+	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunction(
 			DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
 			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
 			DataSet<LongValue> numberOfVertices) {
 
-		// build the messaging function (co group)
+		// build the scatter function (co group)
 		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
-		MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
-				new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+		ScatterUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
+				new ScatterUdfWithEVsSimpleVV<>(scatterFunction, messageTypeInfo);
 
 		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
 				.equalTo(equalToArg).with(messenger);
@@ -482,7 +478,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		// configure coGroup message function with name and broadcast variables
 		messages = messages.name("Messaging");
 		if(this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) {
 				messages = messages.withBroadcastSet(e.f1, e.f0);
 			}
 			if (this.configuration.isOptNumVertices()) {
@@ -494,7 +490,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	}
 
 	/**
-	 * Method that builds the messaging function using a coGroup operator for a vertex
+	 * Method that builds the scatter function using a coGroup operator for a vertex
 	 * containing degree information.
 	 * It afterwards configures the function with a custom name and broadcast variables.
 	 *
@@ -502,17 +498,17 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 * @param messageTypeInfo
 	 * @param whereArg the argument for the where within the coGroup
 	 * @param equalToArg the argument for the equalTo within the coGroup
-	 * @return the messaging function
+	 * @return the scatter function
 	 */
-	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
+	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunctionVerticesWithDegrees(
 			DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration,
 			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
 			DataSet<LongValue> numberOfVertices) {
 
-		// build the messaging function (co group)
+		// build the scatter function (co group)
 		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
-		MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger =
-				new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+		ScatterUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger =
+				new ScatterUdfWithEVsVVWithDegrees<>(scatterFunction, messageTypeInfo);
 
 		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
 				.equalTo(equalToArg).with(messenger);
@@ -521,7 +517,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		messages = messages.name("Messaging");
 
 		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) {
 				messages = messages.withBroadcastSet(e.f1, e.f0);
 			}
 			if (this.configuration.isOptNumVertices()) {
@@ -543,7 +539,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		// set up the iteration operator
 		if (this.configuration != null) {
 
-			iteration.name(this.configuration.getName("Scatter-gather iteration (" + updateFunction + " | " + messagingFunction + ")"));
+			iteration.name(this.configuration.getName("Scatter-gather iteration (" + gatherFunction + " | " + scatterFunction + ")"));
 			iteration.parallelism(this.configuration.getParallelism());
 			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
 
@@ -554,7 +550,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		}
 		else {
 			// no configuration provided; set default name
-			iteration.name("Scatter-gather iteration (" + updateFunction + " | " + messagingFunction + ")");
+			iteration.name("Scatter-gather iteration (" + gatherFunction + " | " + scatterFunction + ")");
 		}
 	}
 
@@ -579,21 +575,21 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		switch (messagingDirection) {
 			case IN:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices);
+				messages = buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices);
 				break;
 			case OUT:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices);
+				messages = buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices);
 				break;
 			case ALL:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices)
-						.union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
+				messages = buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices)
+						.union(buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
 				break;
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
 
-		VertexUpdateUdf<K, VV, Message> updateUdf =
-				new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
+		GatherUdf<K, VV, Message> updateUdf =
+				new GatherUdfSimpleVV<K, VV, Message>(gatherFunction, vertexTypes);
 
 		// build the update function (co group)
 		CoGroupOperator<?, ?, Vertex<K, VV>> updates =
@@ -624,7 +620,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		DataSet<Tuple2<K, Message>> messages;
 
-		this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
+		this.gatherFunction.setOptDegrees(this.configuration.isOptDegrees());
 
 		DataSet<Tuple2<K, LongValue>> inDegrees = graph.inDegrees();
 		DataSet<Tuple2<K, LongValue>> outDegrees = graph.outDegrees();
@@ -634,7 +630,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 					@Override
 					public void join(Tuple2<K, LongValue> first, Tuple2<K, LongValue> second, Collector<Tuple3<K, LongValue, LongValue>> out) {
-						out.collect(new Tuple3<K, LongValue, LongValue>(first.f0, first.f1, second.f1));
+						out.collect(new Tuple3<>(first.f0, first.f1, second.f1));
 					}
 				}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
 
@@ -644,9 +640,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 					@Override
 					public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees,
 									Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
-
-						out.collect(new Vertex<K, Tuple3<VV, LongValue, LongValue>>(vertex.getId(),
-								new Tuple3<VV, LongValue, LongValue>(vertex.getValue(), degrees.f1, degrees.f2)));
+						out.collect(new Vertex<>(vertex.getId(),
+									new Tuple3<>(vertex.getValue(), degrees.f1, degrees.f2)));
 					}
 				}).withForwardedFieldsFirst("f0");
 
@@ -659,22 +654,22 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		switch (messagingDirection) {
 			case IN:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices);
+				messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices);
 				break;
 			case OUT:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices);
+				messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices);
 				break;
 			case ALL:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices)
-						.union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
+				messages = buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices)
+						.union(buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
 				break;
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
-		VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf =
-				new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
+		GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf =
+				new GatherUdfVVWithDegrees(gatherFunction, vertexTypes);
 
 		// build the update function (co group)
 		CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, LongValue, LongValue>>> updates =
@@ -690,7 +685,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 				new MapFunction<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, VV>>() {
 
 					public Vertex<K, VV> map(Vertex<K, Tuple3<VV, LongValue, LongValue>> vertex) {
-						return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
+						return new Vertex<>(vertex.getId(), vertex.getValue().f0);
 					}
 				});
 	}
@@ -700,7 +695,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		// configure coGroup update function with name and broadcast variables
 		updates = updates.name("Vertex State Updates");
 		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
 				updates = updates.withBroadcastSet(e.f1, e.f0);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
deleted file mode 100644
index 9085432..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This class must be extended by functions that compute the state of the vertex depending on the old state and the
- * incoming messages. The central method is {@link #updateVertex(Vertex, MessageIterator)}, which is
- * invoked once per vertex per superstep.
- * 
- * {@code <K>} The vertex key type.
- * {@code <VV>} The vertex value type.
- * {@code <Message>} The message type.
- */
-public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	// --------------------------------------------------------------------------------------------
-	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
-	//  inside an iteration.
-	// --------------------------------------------------------------------------------------------
-
-	private long numberOfVertices = -1L;
-
-	/**
-	 * Retrieves the number of vertices in the graph.
-	 * @return the number of vertices if the {@link org.apache.flink.graph.IterationConfiguration#setOptNumVertices(boolean)}
-	 * option has been set; -1 otherwise.
-	 */
-	public long getNumberOfVertices() {
-		return numberOfVertices;
-	}
-
-	void setNumberOfVertices(long numberOfVertices) {
-		this.numberOfVertices = numberOfVertices;
-	}
-
-	//---------------------------------------------------------------------------------------------
-
-	private boolean optDegrees;
-
-	boolean isOptDegrees() {
-		return optDegrees;
-	}
-
-	void setOptDegrees(boolean optDegrees) {
-		this.optDegrees = optDegrees;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
-	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
-	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
-	 * 
-	 * @param vertex The vertex.
-	 * @param inMessages The incoming messages to this vertex.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() throws Exception {}
-	
-	/**
-	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
-	 *
-	 * This should be called at most once per updateVertex.
-	 * 
-	 * @param newValue The new vertex value.
-	 */
-	public void setNewVertexValue(VV newValue) {
-		if(setNewVertexValueCalled) {
-			throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
-		}
-		setNewVertexValueCalled = true;
-		if(isOptDegrees()) {
-			outValWithDegrees.f1.f0 = newValue;
-			outWithDegrees.collect(outValWithDegrees);
-		} else {
-			outVal.setValue(newValue);
-			out.collect(outVal);
-		}
-	}
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  internal methods
-	// --------------------------------------------------------------------------------------------
-	
-	private IterationRuntimeContext runtimeContext;
-
-	private Collector<Vertex<K, VV>> out;
-	
-	private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
-
-	private Vertex<K, VV> outVal;
-
-	private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
-
-	private long inDegree = -1;
-
-	private long outDegree = -1;
-
-	private boolean setNewVertexValueCalled;
-
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-	}
-
-	void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
-		this.outVal = outVal;
-		this.out = out;
-		setNewVertexValueCalled = false;
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	<ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
-			Collector out) {
-		this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
-		this.outWithDegrees = out;
-		setNewVertexValueCalled = false;
-	}
-
-	/**
-	 * Retrieves the vertex in-degree (number of in-coming edges).
-	 * @return The in-degree of this vertex
-	 */
-	public long getInDegree() {
-		return inDegree;
-	}
-
-	void setInDegree(long inDegree) {
-		this.inDegree = inDegree;
-	}
-
-	/**
-	 * Retrieve the vertex out-degree (number of out-going edges).
-	 * @return The out-degree of this vertex
-	 */
-	public long getOutDegree() {
-		return outDegree;
-	}
-
-	void setOutDegree(long outDegree) {
-		this.outDegree = outDegree;
-	}
-
-	/**
-	 * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
-	 * another function will be called from {@link org.apache.flink.graph.spargel.ScatterGatherIteration}.
-	 *
-	 * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
-	 * the regular updateVertex function.
-	 *
-	 * @param vertexState
-	 * @param inMessages
-	 * @throws Exception
-	 */
-	@SuppressWarnings("unchecked")
-	<VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
-												MessageIterator<Message> inMessages) throws Exception {
-
-		Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
-				((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
-
-		updateVertex(vertex, inMessages);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/918e5d0c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 3a750af..14c2fb4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -75,8 +75,8 @@ public class SpargelCompilerTest extends CompilerTestBase {
 				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
 
 				DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
-						new ConnectedComponents.CCUpdater<Long, Long>(),
-						new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), 100)
+						new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+						new ConnectedComponents.CCUpdater<Long, Long>(), 100)
 						.getVertices();
 				
 				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
@@ -157,12 +157,12 @@ public class SpargelCompilerTest extends CompilerTestBase {
 				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
 
 				ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
-				parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
-				parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+				parameters.addBroadcastSetForScatterFunction(BC_VAR_NAME, bcVar);
+				parameters.addBroadcastSetForGatherFunction(BC_VAR_NAME, bcVar);
 
 				DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
-						new ConnectedComponents.CCUpdater<Long, Long>(),
-						new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO), 100)
+						new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+						new ConnectedComponents.CCUpdater<Long, Long>(), 100)
 						.getVertices();
 					
 				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());


Mime
View raw message