flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [20/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:41:58 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
new file mode 100644
index 0000000..6d97525
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+/**
+ * <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
+ * vertex id. Edges may have an associated value (for example a weight or a distance), if the
+ * graph algorithm was initialized with the
+ * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
+ * method.
+ *
+ * @param <VertexKey> The type of the vertex key.
+ * @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold
+ *                    value, this type may be arbitrary.
+ */
+public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private VertexKey target;
+	
+	private EdgeValue edgeValue;
+	
+	void set(VertexKey target, EdgeValue edgeValue) {
+		this.target = target;
+		this.edgeValue = edgeValue;
+	}
+	
+	/**
+	 * Gets the target vertex id.
+	 * 
+	 * @return The target vertex id.
+	 */
+	public VertexKey target() {
+		return target;
+	}
+	
+	/**
+	 * Gets the value associated with the edge. The value may be null if the iteration was initialized with
+	 * an edge data set without edge values.
+	 * Typical examples of edge values are weights or distances of the path represented by the edge.
+	 *  
+	 * @return The value associated with the edge.
+	 */
+	public EdgeValue edgeValue() {
+		return edgeValue;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
new file mode 100644
index 0000000..4f84467
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+/**
+ * This class represents iterative graph computations, programmed in a vertex-centric perspective.
+ * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
+ * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
+ * <p>
+ * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The 
+ * algorithms send messages along the edges and update the state of vertices based on
+ * the old state and the incoming messages. All vertices have an initial state.
+ * The computation terminates once no vertex updates it state any more.
+ * Additionally, a maximum number of iterations (supersteps) may be specified.
+ * <p>
+ * The computation is here represented by two functions:
+ * <ul>
+ *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
+ *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
+ *   considered updated.</li>
+ *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
+ *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
+ * </ul>
+ * <p>
+ * Vertex-centric graph iterations are instantiated by the
+ * {@link #withPlainEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, or the
+ * {@link #withValuedEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, depending on whether
+ * the graph's edges are carrying values.
+ *
+ * @param <VertexKey> The type of the vertex key (the vertex identifier).
+ * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EdgeValue> The type of the values that are associated with the edges.
+ */
+public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
+	implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+{
+	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
+	
+	private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+	
+	private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
+	
+	private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
+	
+	private final Map<String, Aggregator<?>> aggregators;
+	
+	private final int maximumNumberOfIterations;
+	
+	private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
+	
+	private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
+	
+	private final TypeInformation<Message> messageType;
+	
+	private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
+	
+	private String name;
+	
+	private int parallelism = -1;
+	
+	private boolean unmanagedSolutionSet;
+	
+	// ----------------------------------------------------------------------------------
+	
+	private  VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+			DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
+			int maximumNumberOfIterations)
+	{
+		Validate.notNull(uf);
+		Validate.notNull(mf);
+		Validate.notNull(edgesWithoutValue);
+		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+		
+		// check that the edges are actually a valid tuple set of vertex key types
+		TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType();
+		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.");
+		
+		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
+		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
+			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
+			"Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
+		
+		this.updateFunction = uf;
+		this.messagingFunction = mf;
+		this.edgesWithoutValue = edgesWithoutValue;
+		this.edgesWithValue = null;
+		this.maximumNumberOfIterations = maximumNumberOfIterations;
+		this.aggregators = new HashMap<String, Aggregator<?>>();
+		
+		this.messageType = getMessageType(mf);
+	}
+	
+	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+			DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, 
+			int maximumNumberOfIterations,
+			boolean edgeHasValueMarker)
+	{
+		Validate.notNull(uf);
+		Validate.notNull(mf);
+		Validate.notNull(edgesWithValue);
+		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+		
+		// check that the edges are actually a valid tuple set of vertex key types
+		TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType();
+		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.");
+		
+		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
+		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
+			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
+			"The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
+		
+		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+		
+		this.updateFunction = uf;
+		this.messagingFunction = mf;
+		this.edgesWithoutValue = null;
+		this.edgesWithValue = edgesWithValue;
+		this.maximumNumberOfIterations = maximumNumberOfIterations;
+		this.aggregators = new HashMap<String, Aggregator<?>>();
+		
+		this.messageType = getMessageType(mf);
+	}
+	
+	private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
+		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
+	}
+	
+	/**
+	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
+	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
+	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+	 * 
+	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
+	 * @param aggregator The aggregator.
+	 */
+	public void registerAggregator(String name, Aggregator<?> aggregator) {
+		this.aggregators.put(name, aggregator);
+	}
+	
+	/**
+	 * Adds a data set as a broadcast set to the messaging function.
+	 * 
+	 * @param name The name under which the broadcast data is available in the messaging function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Adds a data set as a broadcast set to the vertex update function.
+	 * 
+	 * @param name The name under which the broadcast data is available in the vertex update function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+	
+	/**
+	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
+	 * 
+	 * @param name The name for the iteration.
+	 */
+	public void setName(String name) {
+		this.name = name;
+	}
+	
+	/**
+	 * Gets the name from this vertex-centric iteration.
+	 * 
+	 * @return The name of the iteration.
+	 */
+	public String getName() {
+		return name;
+	}
+	
+	/**
+	 * Sets the degree of parallelism for the iteration.
+	 * 
+	 * @param parallelism The degree of parallelism.
+	 */
+	public void setParallelism(int parallelism) {
+		Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
+		this.parallelism = parallelism;
+	}
+	
+	/**
+	 * Gets the iteration's degree of parallelism.
+	 * 
+	 * @return The iterations parallelism, or -1, if not set.
+	 */
+	public int getParallelism() {
+		return parallelism;
+	}
+	
+	/**
+	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
+	 * in serialized form) or as a simple object map.
+	 * By default, the solution set runs in managed memory.
+	 * 
+	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
+	 */
+	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
+		this.unmanagedSolutionSet = unmanaged;
+	}
+	
+	/**
+	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
+	 * in serialized form) or as a simple object map.
+	 * By default, the solution set runs in managed memory.
+	 * 
+	 * @return True, if the solution set is in unmanaged memory, false otherwise.
+	 */
+	public boolean isSolutionSetUnmanagedMemory() {
+		return this.unmanagedSolutionSet;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom Operator behavior
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Sets the input data set for this operator. In the case of this operator this input data set represents
+	 * the set of vertices with their initial state.
+	 * 
+	 * @param inputData The input data set, which in the case of this operator represents the set of
+	 *                  vertices with their initial state.
+	 * 
+	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
+	 */
+	@Override
+	public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
+		// sanity check that we really have two tuples
+		TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType();
+		Validate.isTrue(inputType.isTupleType() && inputType.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.");
+
+		// check that the key type here is the same as for the edges
+		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0);
+		TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType() : edgesWithValue.getType();
+		TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0);
+		
+		Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of the input data set (the initial vertices) " +
+				"must be the same data type as the first fields of the edge data set (the source vertex id). " +
+				"Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.", keyType, edgeKeyType);
+
+		this.initialVertices = inputData;
+	}
+	
+	/**
+	 * Creates the operator that represents this vertex-centric graph computation.
+	 * 
+	 * @return The operator that represents this vertex-centric graph computation.
+	 */
+	@Override
+	public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
+		if (this.initialVertices == null) {
+			throw new IllegalStateException("The input data set has not been set.");
+		}
+		
+		// prepare some type information
+		TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
+		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
+		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);		
+		
+		// set up the iteration operator
+		final String name = (this.name != null) ? this.name :
+			"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
+		final int[] zeroKeyPos = new int[] {0};
+	
+		final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> iteration =
+			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
+		iteration.name(name);
+		iteration.parallelism(parallelism);
+		iteration.setSolutionSetUnManaged(unmanagedSolutionSet);
+		
+		// register all aggregators
+		for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
+			iteration.registerAggregator(entry.getKey(), entry.getValue());
+		}
+		
+		// build the messaging function (co group)
+		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
+		if (edgesWithoutValue != null) {
+			MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message>(messagingFunction, messageTypeInfo);
+			messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
+		}
+		else {
+			MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
+			messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
+		}
+		
+		// configure coGroup message function with name and broadcast variables
+		messages = messages.name("Messaging");
+		for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
+			messages = messages.withBroadcastSet(e.f1, e.f0);
+		}
+		
+		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+		
+		// build the update function (co group)
+		CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
+				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+		
+		// configure coGroup update function with name and broadcast variables
+		updates = updates.name("Vertex State Updates");
+		for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
+			updates = updates.withBroadcastSet(e.f1, e.f0);
+		}
+
+		// let the operator know that we preserve the key field
+		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
+		
+		return iteration.closeWith(updates, updates);
+		
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Constructor builders to avoid signature conflicts with generic type erasure
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a new vertex-centric iteration operator for graphs where the edges are not associated with a value.
+	 * 
+	 * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
+	 * @param vertexUpdateFunction The function that updates the state of the vertices from the incoming messages.
+	 * @param messagingFunction The function that turns changed vertex states into messages along the edges.
+	 * 
+	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
+	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
+	 * @param <Message> The type of the message sent between vertices along the edges.
+	 * 
+	 * @return An in stance of the vertex-centric graph computation operator.
+	 */
+	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message>
+			VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(
+					DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
+						VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
+						MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
+						int maximumNumberOfIterations)
+	{
+		@SuppressWarnings("unchecked")
+		MessagingFunction<VertexKey, VertexValue, Message, Object> tmf = 
+								(MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
+		
+		return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations);
+	}
+	
+	/**
+	 * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
+	 * a weight or distance).
+	 * 
+	 * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
+	 * @param uf The function that updates the state of the vertices from the incoming messages.
+	 * @param mf The function that turns changed vertex states into messages along the edges.
+	 * 
+	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
+	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
+	 * @param <Message> The type of the message sent between vertices along the edges.
+	 * @param <EdgeValue> The type of the values that are associated with the edges.
+	 * 
+	 * @return An in stance of the vertex-centric graph computation operator.
+	 */
+	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
+			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(
+					DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
+					VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+					MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+					int maximumNumberOfIterations)
+	{
+		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Wrapping UDFs
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
+		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+		implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
+
+		private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
+		
+		private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;
+		
+		
+		private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
+				TypeInformation<Tuple2<VertexKey, VertexValue>> resultType)
+		{
+			this.vertexUpdateFunction = vertexUpdateFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex,
+				Collector<Tuple2<VertexKey, VertexValue>> out)
+			throws Exception
+		{
+			final Iterator<Tuple2<VertexKey, VertexValue>> vertexIter = vertex.iterator();
+			
+			if (vertexIter.hasNext()) {
+				Tuple2<VertexKey, VertexValue> vertexState = vertexIter.next();
+				
+				@SuppressWarnings("unchecked")
+				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+				messageIter.setSource(downcastIter);
+				
+				vertexUpdateFunction.setOutput(vertexState, out);
+				vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
+			}
+			else {
+				final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+				if (messageIter.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Tuple2<VertexKey, Message> next = messageIter.next();
+						message = "Target vertex '" + next.f0 + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.vertexUpdateFunction.init(getIterationRuntimeContext());
+			}
+			this.vertexUpdateFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.vertexUpdateFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
+			return this.resultType;
+		}
+	}
+	
+	/*
+	 * UDF that encapsulates the message sending function for graphs where the edges have no associated values.
+	 */
+	private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
+		extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
+		
+		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
+		
+		
+		private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
+				TypeInformation<Tuple2<VertexKey, Message>> resultType)
+		{
+			this.messagingFunction = messagingFunction;
+			this.resultType = resultType;
+		}
+		
+		@Override
+		public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> edges,
+				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+			throws Exception
+		{
+			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
+			
+			if (stateIter.hasNext()) {
+				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
+				messagingFunction.set((Iterator<?>) edges.iterator(), out);
+				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
+			}
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.messagingFunction.init(getIterationRuntimeContext(), false);
+			}
+			
+			this.messagingFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.messagingFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
+			return this.resultType;
+		}
+	}
+	
+	/*
+	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
+	 */
+	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
+		extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+		
+		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
+		
+		
+		private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
+				TypeInformation<Tuple2<VertexKey, Message>> resultType)
+		{
+			this.messagingFunction = messagingFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
+				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+			throws Exception
+		{
+			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
+			
+			if (stateIter.hasNext()) {
+				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
+				messagingFunction.set((Iterator<?>) edges.iterator(), out);
+				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
+			}
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.messagingFunction.init(getIterationRuntimeContext(), true);
+			}
+			
+			this.messagingFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.messagingFunction.postSuperstep();
+		}
+		
+		@Override
+		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
+			return this.resultType;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
new file mode 100644
index 0000000..a5548b7
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * This class must be extended by functions that compute the state of the vertex depending on the old state and the
+ * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
+ * invoked once per vertex per superstep.
+ * 
+ * <VertexKey> The vertex key type.
+ * <VertexValue> The vertex value type.
+ * <Message> The message type.
+ */
+public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
+	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
+	 * 
+	 * @param vertexKey The key (identifier) of the vertex.
+	 * @param vertexValue The value (state) of the vertex.
+	 * @param inMessages The incoming messages to this vertex.
+	 * 
+	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+	 */
+	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
+	
+	/**
+	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() throws Exception {}
+	
+	/**
+	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() throws Exception {}
+	
+	/**
+	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+	 * 
+	 * @param newValue The new vertex value.
+	 */
+	public void setNewVertexValue(VertexValue newValue) {
+		outVal.f1 = newValue;
+		out.collect(outVal);
+	}
+	
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 * 
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+	
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function. They can be registered via
+	 * {@link VertexCentricIteration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
+	 * 
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  internal methods
+	// --------------------------------------------------------------------------------------------
+	
+	private IterationRuntimeContext runtimeContext;
+	
+	private Collector<Tuple2<VertexKey, VertexValue>> out;
+	
+	private Tuple2<VertexKey, VertexValue> outVal;
+	
+	
+	void init(IterationRuntimeContext context) {
+		this.runtimeContext = context;
+	}
+	
+	void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey, VertexValue>> out) {
+		this.out = out;
+		this.outVal = val;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
new file mode 100644
index 0000000..7574dab
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class SpargelConnectedComponents {
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Long> vertexIds = env.generateSequence(0, 10);
+		DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(0L, 2L), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Long, Long>(4L, 8L),
+															new Tuple2<Long, Long>(1L, 5L), new Tuple2<Long, Long>(3L, 7L), new Tuple2<Long, Long>(3L, 9L));
+		
+		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+		
+		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+		
+		result.print();
+		env.execute("Spargel Connected Components");
+	}
+	
+	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
+		@Override
+		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+			long min = Long.MAX_VALUE;
+			for (long msg : inMessages) {
+				min = Math.min(min, msg);
+			}
+			if (min < vertexValue) {
+				setNewVertexValue(min);
+			}
+		}
+	}
+	
+	public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> {
+		@Override
+		public void sendMessages(Long vertexId, Long componentId) {
+			sendMessageToAllNeighbors(componentId);
+		}
+	}
+	
+	/**
+	 * A map function that takes a Long value and creates a 2-tuple out of it:
+	 * <pre>(Long value) -> (value, value)</pre>
+	 */
+	public static final class IdAssigner implements MapFunction<Long, Tuple2<Long, Long>> {
+		@Override
+		public Tuple2<Long, Long> map(Long value) {
+			return new Tuple2<Long, Long>(value, value);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
new file mode 100644
index 0000000..fccb195
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.OutgoingEdge;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
+ * In this implementation, the edges carry a weight (the transition probability).
+ */
+@SuppressWarnings("serial")
+public class SpargelPageRank {
+	
+	private static final double BETA = 0.85;
+
+	
+	public static void main(String[] args) throws Exception {
+		final int numVertices = 100;
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// enumerate some sample edges and assign an initial uniform probability (rank)
+		DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
+								.map(new MapFunction<Long, Tuple2<Long, Double>>() {
+									public Tuple2<Long, Double> map(Long value) {
+										return new Tuple2<Long, Double>(value, 1.0/numVertices);
+									}
+								});
+		
+		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
+		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
+								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
+										int numOutEdges = (int) (Math.random() * (numVertices / 2));
+										for (int i = 0; i < numOutEdges; i++) {
+											long target = (long) (Math.random() * numVertices) + 1;
+											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
+										}
+									}
+								});
+		
+		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(
+			VertexCentricIteration.withValuedEdges(edgesWithProbability,
+						new VertexRankUpdater(numVertices, BETA), new RankMessenger(), 20));
+		
+		result.print();
+		env.execute("Spargel PageRank");
+	}
+	
+	/**
+	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
+	 * and then applying the dampening formula.
+	 */
+	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
+		
+		private final long numVertices;
+		private final double beta;
+		
+		public VertexRankUpdater(long numVertices, double beta) {
+			this.numVertices = numVertices;
+			this.beta = beta;
+		}
+
+		@Override
+		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
+			double rankSum = 0.0;
+			for (double msg : inMessages) {
+				rankSum += msg;
+			}
+			
+			// apply the dampening factor / random jump
+			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
+			setNewVertexValue(newRank);
+		}
+	}
+	
+	/**
+	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
+	 * which is associated with an edge as the edge value.
+	 */
+	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
+		
+		@Override
+		public void sendMessages(Long vertexId, Double newRank) {
+			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.target(), newRank * edge.edgeValue());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
new file mode 100644
index 0000000..ae4ee95
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.OutgoingEdge;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
+ * In this implementation, the edges carry a weight (the transition probability).
+ */
+@SuppressWarnings("serial")
+public class SpargelPageRankCountingVertices {
+	
+	private static final double BETA = 0.85;
+
+	
+	public static void main(String[] args) throws Exception {
+		final int NUM_VERTICES = 100;
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// a list of vertices
+		DataSet<Long> vertices = env.generateSequence(1, NUM_VERTICES);
+		
+		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
+		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
+								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
+										int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
+										for (int i = 0; i < numOutEdges; i++) {
+											long target = (long) (Math.random() * NUM_VERTICES) + 1;
+											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
+										}
+									}
+								});
+		
+		// ---------- start of the algorithm ---------------
+		
+		// count the number of vertices
+		DataSet<Long> count = vertices
+			.map(new MapFunction<Long, Long>() {
+				public Long map(Long value) {
+					return 1L;
+				}
+			})
+			.reduce(new ReduceFunction<Long>() {
+				public Long reduce(Long value1, Long value2) {
+					return value1 + value2;
+				}
+			});
+		
+		// enumerate some sample edges and assign an initial uniform probability (rank)
+		DataSet<Tuple2<Long, Double>> intialRanks = vertices
+			.map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
+				
+				private long numVertices;
+				
+				@Override
+				public void open(Configuration parameters) {
+					numVertices = getRuntimeContext().<Long>getBroadcastVariable("count").iterator().next();
+				}
+				
+				public Tuple2<Long, Double> map(Long value) {
+					return new Tuple2<Long, Double>(value, 1.0/numVertices);
+				}
+			}).withBroadcastSet(count, "count");
+		
+
+		VertexCentricIteration<Long, Double, Double, Double> iteration = VertexCentricIteration.withValuedEdges(edgesWithProbability,
+				new VertexRankUpdater(BETA), new RankMessenger(), 20);
+		iteration.addBroadcastSetForUpdateFunction("count", count);
+		
+		
+		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(iteration);
+		
+		result.print();
+		env.execute("Spargel PageRank");
+	}
+	
+	/**
+	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
+	 * and then applying the dampening formula.
+	 */
+	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
+		
+		private final double beta;
+		private long numVertices;
+		
+		public VertexRankUpdater(double beta) {
+			this.beta = beta;
+		}
+		
+		@Override
+		public void preSuperstep() {
+			numVertices = this.<Long>getBroadcastSet("count").iterator().next();
+		}
+
+		@Override
+		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
+			double rankSum = 0.0;
+			for (double msg : inMessages) {
+				rankSum += msg;
+			}
+			
+			// apply the dampening factor / random jump
+			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
+			setNewVertexValue(newRank);
+		}
+	}
+	
+	/**
+	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
+	 * which is associated with an edge as the edge value.
+	 */
+	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
+		
+		@Override
+		public void sendMessages(Long vertexId, Double newRank) {
+			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.target(), newRank * edge.edgeValue());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
new file mode 100644
index 0000000..0c39688
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Value;
+
+
+public final class Edge<VertexKey extends Key<VertexKey>, EdgeValue extends Value> {
+	
+	private VertexKey target;
+	private EdgeValue edgeValue;
+	
+	void set(VertexKey target, EdgeValue edgeValue) {
+		this.target = target;
+		this.edgeValue = edgeValue;
+	}
+	
+	public VertexKey target() {
+		return target;
+	}
+	
+	public EdgeValue edgeValue() {
+		return edgeValue;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
new file mode 100644
index 0000000..03022d2
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.util.Iterator;
+
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+
+public final class MessageIterator<Message extends Value> implements Iterator<Message>, Iterable<Message> {
+
+	private final Message instance;
+	private Iterator<Record> source;
+	
+	public MessageIterator(Message instance) {
+		this.instance = instance;
+	}
+	
+	public final void setSource(Iterator<Record> source) {
+		this.source = source;
+	}
+	
+	@Override
+	public final boolean hasNext() {
+		return this.source.hasNext();
+	}
+	
+	@Override
+	public final Message next() {
+		this.source.next().getFieldInto(1, this.instance);
+		return this.instance;
+	}
+
+	@Override
+	public final void remove() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Iterator<Message> iterator() {
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
new file mode 100644
index 0000000..794dce5
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+public abstract class MessagingFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> implements Serializable {
+
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+	
+	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
+	
+	public void setup(Configuration config) throws Exception {}
+	
+	public void preSuperstep() throws Exception {}
+	
+	public void postSuperstep() throws Exception {}
+	
+	
+	public Iterator<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
+		}
+		
+		edgesUsed = true;
+		edgeIter.set(edges);
+		return edgeIter;
+	}
+	
+	public void sendMessageToAllNeighbors(Message m) {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
+		}
+		
+		edgesUsed = true;
+		while (edges.hasNext()) {
+			Record next = edges.next();
+			VertexKey k = next.getField(1, this.keyClass);
+			outValue.setField(0, k);
+			outValue.setField(1, m);
+			out.collect(outValue);
+		}
+	}
+	
+	public void sendMessageTo(VertexKey target, Message m) {
+		outValue.setField(0, target);
+		outValue.setField(1, m);
+		out.collect(outValue);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public int getSuperstep() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  internal methods and state
+	// --------------------------------------------------------------------------------------------
+	
+	private Record outValue;
+	
+	private IterationRuntimeContext runtimeContext;
+	
+	private Iterator<Record> edges;
+	
+	private Collector<Record> out;
+	
+	private EdgesIterator<VertexKey, EdgeValue> edgeIter;
+	
+	private Class<VertexKey> keyClass;
+	
+	private boolean edgesUsed;
+	
+	
+	@SuppressWarnings("unchecked")
+	void init(IterationRuntimeContext context, VertexKey keyHolder, EdgeValue edgeValueHolder) {
+		this.runtimeContext = context;
+		this.edgeIter = new EdgesIterator<VertexKey, EdgeValue>(keyHolder, edgeValueHolder);
+		this.outValue = new Record();
+		this.keyClass = (Class<VertexKey>) keyHolder.getClass();
+	}
+	
+	void set(Iterator<Record> edges, Collector<Record> out) {
+		this.edges = edges;
+		this.out = out;
+		this.edgesUsed = false;
+	}
+	
+	private static final long serialVersionUID = 1L;
+	
+	private static final class EdgesIterator<VertexKey extends Key<VertexKey>, EdgeValue extends Value> implements Iterator<Edge<VertexKey, EdgeValue>> {
+
+		private Iterator<Record> input;
+		private VertexKey keyHolder;
+		private EdgeValue edgeValueHolder;
+		
+		private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
+		
+		EdgesIterator(VertexKey keyHolder, EdgeValue edgeValueHolder) {
+			this.keyHolder = keyHolder;
+			this.edgeValueHolder = edgeValueHolder;
+		}
+		
+		void set(Iterator<Record> input) {
+			this.input = input;
+		}
+		
+		@Override
+		public boolean hasNext() {
+			return input.hasNext();
+		}
+
+		@Override
+		public Edge<VertexKey, EdgeValue> next() {
+			Record next = input.next();
+			next.getFieldInto(0, keyHolder);
+			next.getFieldInto(1, edgeValueHolder);
+			edge.set(keyHolder, edgeValueHolder);
+			return edge;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
new file mode 100644
index 0000000..f647e5d
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.AggregatorRegistry;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.ReflectionUtil;
+
+@SuppressWarnings("deprecation")
+public class SpargelIteration {
+	
+	private static final String DEFAULT_NAME = "<unnamed vertex-centric iteration>";
+	
+	private final DeltaIteration iteration;
+	
+	private final Class<? extends Key<?>> vertexKey;
+	private final Class<? extends Value> vertexValue;
+	private final Class<? extends Value> messageType;
+	private final Class<? extends Value> edgeValue;
+	
+	private final CoGroupOperator vertexUpdater;
+	private final CoGroupOperator messager;
+	
+	
+	// ----------------------------------------------------------------------------------
+	
+	public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value>
+			SpargelIteration(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+			VertexUpdateFunction<VertexKey, VertexValue, Message> uf)
+	{
+		this(mf, uf, DEFAULT_NAME);
+	}
+	
+	public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> SpargelIteration(
+			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+			String name)
+	{
+		// get the types
+		this.vertexKey = ReflectionUtil.getTemplateType1(mf.getClass());
+		this.vertexValue = ReflectionUtil.getTemplateType2(mf.getClass());
+		this.messageType = ReflectionUtil.getTemplateType3(mf.getClass());
+		this.edgeValue = ReflectionUtil.getTemplateType4(mf.getClass());
+		
+		if (vertexKey == null || vertexValue == null || messageType == null || edgeValue == null) {
+			throw new RuntimeException();
+		}
+	
+		// instantiate the data flow
+		this.iteration = new DeltaIteration(0, name);
+		
+		this.messager = CoGroupOperator.builder(MessagingDriver.class, vertexKey, 0, 0)
+			.input2(iteration.getWorkset())
+			.name("Message Sender")
+			.build();
+		this.vertexUpdater = CoGroupOperator.builder(VertexUpdateDriver.class, vertexKey, 0, 0)
+			.input1(messager)
+			.input2(iteration.getSolutionSet())
+			.name("Vertex Updater")
+			.build();
+		
+		iteration.setNextWorkset(vertexUpdater);
+		iteration.setSolutionSetDelta(vertexUpdater);
+		
+		// parameterize the data flow
+		try {
+			Configuration vertexUdfParams = vertexUpdater.getParameters();
+			InstantiationUtil.writeObjectToConfig(uf, vertexUdfParams, VertexUpdateDriver.UDF_PARAM);
+			vertexUdfParams.setClass(VertexUpdateDriver.KEY_PARAM, vertexKey);
+			vertexUdfParams.setClass(VertexUpdateDriver.VALUE_PARAM, vertexValue);
+			vertexUdfParams.setClass(VertexUpdateDriver.MESSAGE_PARAM, messageType);
+			
+			Configuration messageUdfParams = messager.getParameters();
+			InstantiationUtil.writeObjectToConfig(mf, messageUdfParams, MessagingDriver.UDF_PARAM);
+			messageUdfParams.setClass(MessagingDriver.KEY_PARAM, vertexKey);
+			messageUdfParams.setClass(MessagingDriver.VALUE_PARAM, vertexValue);
+			messageUdfParams.setClass(MessagingDriver.MESSAGE_PARAM, messageType);
+			messageUdfParams.setClass(MessagingDriver.EDGE_PARAM, edgeValue);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Could not serialize the UDFs for distribution" + 
+					(e.getMessage() == null ? '.' : ": " + e.getMessage()), e);
+		}
+	}
+	
+	// ----------------------------------------------------------------------------------
+	//  inputs and outputs
+	// ----------------------------------------------------------------------------------
+	
+	public void setVertexInput(Operator<Record> c) {
+		this.iteration.setInitialSolutionSet(c);
+		this.iteration.setInitialWorkset(c);
+	}
+	
+	public void setEdgesInput(Operator<Record> c) {
+		this.messager.setFirstInput(c);
+	}
+	
+	public Operator<?> getOutput() {
+		return this.iteration;
+	}
+	
+	public void setDegreeOfParallelism(int dop) {
+		this.iteration.setDegreeOfParallelism(dop);
+	}
+	
+	public void setNumberOfIterations(int iterations) {
+		this.iteration.setMaximumNumberOfIterations(iterations);
+	}
+	
+	public AggregatorRegistry getAggregators() {
+		return this.iteration.getAggregators();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Wrapping UDFs
+	// --------------------------------------------------------------------------------------------
+	
+	@ConstantFieldsFirst(0)
+	public static final class VertexUpdateDriver<K extends Key<K>, V extends Value, M extends Value> extends CoGroupFunction {
+		
+		private static final long serialVersionUID = 1L;
+		
+		private static final String UDF_PARAM = "spargel.udf";
+		private static final String KEY_PARAM = "spargel.key-type";
+		private static final String VALUE_PARAM = "spargel.value-type";
+		private static final String MESSAGE_PARAM = "spargel.message-type";
+		
+		private VertexUpdateFunction<K, V, M> vertexUpdateFunction;
+		
+		private K vertexKey;
+		private V vertexValue;
+		private MessageIterator<M> messageIter;
+
+		@Override
+		public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
+			
+			if (vertex.hasNext()) {
+				Record first = vertex.next();
+				first.getFieldInto(0, vertexKey);
+				first.getFieldInto(1, vertexValue);
+				messageIter.setSource(messages);
+				vertexUpdateFunction.setOutput(first, out);
+				vertexUpdateFunction.updateVertex(vertexKey, vertexValue, messageIter);
+			} else {
+				if (messages.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Record next = messages.next();
+						next.getFieldInto(0, vertexKey);
+						message = "Target vertex '" + vertexKey + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
+		}
+		
+		@SuppressWarnings("unchecked")
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// instantiate only the first time
+			if (vertexUpdateFunction == null) {
+				ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
+				
+				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
+				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
+				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, cl);
+				
+				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
+				messageIter = new MessageIterator<M>(InstantiationUtil.instantiate(messageClass, Value.class));
+				
+				ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
+				
+				try {
+					this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
+				} catch (Exception e) {
+					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
+					throw new Exception("Could not instantiate VertexUpdateFunction" + message, e);
+				}
+				
+				this.vertexUpdateFunction.init(getIterationRuntimeContext());
+				this.vertexUpdateFunction.setup(parameters);
+			}
+			this.vertexUpdateFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.vertexUpdateFunction.postSuperstep();
+		}
+	}
+	
+	public static final class MessagingDriver<K extends Key<K>, V extends Value, M extends Value, E extends Value> extends CoGroupFunction {
+
+		private static final long serialVersionUID = 1L;
+		
+		private static final String UDF_PARAM = "spargel.udf";
+		private static final String KEY_PARAM = "spargel.key-type";
+		private static final String VALUE_PARAM = "spargel.value-type";
+		private static final String MESSAGE_PARAM = "spargel.message-type";
+		private static final String EDGE_PARAM = "spargel.edge-value";
+		
+		
+		private MessagingFunction<K, V, M, E> messagingFunction;
+		
+		private K vertexKey;
+		private V vertexValue;
+		
+		@Override
+		public void coGroup(Iterator<Record> edges, Iterator<Record> state, Collector<Record> out) throws Exception {
+			if (state.hasNext()) {
+				Record first = state.next();
+				first.getFieldInto(0, vertexKey);
+				first.getFieldInto(1, vertexValue);
+				messagingFunction.set(edges, out);
+				messagingFunction.sendMessages(vertexKey, vertexValue);
+			}
+		}
+		
+		@SuppressWarnings("unchecked")
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// instantiate only the first time
+			if (messagingFunction == null) {
+				ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
+				
+				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
+				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
+//				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
+				Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, cl);
+				
+				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
+				
+				K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+				E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class);
+				
+				ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
+				
+				try {
+					this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
+				} catch (Exception e) {
+					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
+					throw new Exception("Could not instantiate MessagingFunction" + message, e);
+				}
+				
+				this.messagingFunction.init(getIterationRuntimeContext(), edgeKeyHolder, edgeValueHolder);
+				this.messagingFunction.setup(parameters);
+			}
+			this.messagingFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.messagingFunction.postSuperstep();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
new file mode 100644
index 0000000..299b898
--- /dev/null
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * 
+ * <VertexKey> The vertex key type.
+ * <VertexValue> The vertex value type.
+ * <Message> The message type.
+ */
+public abstract class VertexUpdateFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value> implements Serializable {
+
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+	
+	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
+	
+	public void setup(Configuration config) throws Exception {}
+	
+	public void preSuperstep() throws Exception {}
+	
+	public void postSuperstep() throws Exception {}
+	
+	public void setNewVertexValue(VertexValue newValue) {
+		outVal.setField(1, newValue);
+		out.collect(outVal);
+	}
+	
+	public int getSuperstep() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  internal methods
+	// --------------------------------------------------------------------------------------------
+	
+	private IterationRuntimeContext runtimeContext;
+	
+	private Collector<Record> out;
+	
+	private Record outVal;
+	
+	
+	void init(IterationRuntimeContext context) {
+		this.runtimeContext = context;
+	}
+	
+	void setOutput(Record val, Collector<Record> out) {
+		this.out = out;
+		this.outVal = val;
+	}
+	
+	// serializability
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
new file mode 100644
index 0000000..4b6afd3
--- /dev/null
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.spargel.java;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.PlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
+import org.apache.flink.test.compiler.util.CompilerTestBase;
+
+
+public class SpargelCompilerTest extends CompilerTestBase {
+
+	@Test
+	public void testSpargelCompiler() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+				DataSet<Long> vertexIds = env.generateSequence(1, 2);
+				
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+				
+				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+				
+				result.print();
+			}
+			
+			Plan p = env.createProgramPlan("Spargel Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+			
+			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+			
+			// check that the initial partitioning is pushed out of the loop
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+			
+			// check that the initial workset sort is outside the loop
+			assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
+			assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSpargelCompilerWithBroadcastVariable() {
+		try {
+			final String BC_VAR_NAME = "borat variable";
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+				DataSet<Long> bcVar = env.fromElements(1L);
+				
+				DataSet<Long> vertexIds = env.generateSequence(1, 2);
+				
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+				
+				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+				
+				VertexCentricIteration<Long, Long, Long, ?> vcIter = VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100);
+				vcIter.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
+				vcIter.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+				
+				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
+				
+				result.print();
+			}
+			
+			Plan p = env.createProgramPlan("Spargel Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+			
+			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+			
+			// check that the initial partitioning is pushed out of the loop
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


Mime
View raw message