flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [65/73] [abbrv] prefix all projects in addons and quickstarts with flink-
Date Sat, 12 Jul 2014 12:48:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
new file mode 100644
index 0000000..10ca85d
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
+import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JDBCOutputFormatTest {
+	private JDBCInputFormat jdbcInputFormat;
+	private JDBCOutputFormat jdbcOutputFormat;
+
+	private static Connection conn;
+
+	static final Value[][] dbData = {
+		{new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
+		{new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
+		{new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
+		{new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
+		{new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
+
+	@BeforeClass
+	public static void setUpClass() {
+		try {
+			System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
+			prepareDerbyInputDatabase();
+			prepareDerbyOutputDatabase();
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	private static void cleanUpDerbyDatabases() {
+		 try {
+			 String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+			 Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+			 conn = DriverManager.getConnection(dbURL);
+			 Statement stat = conn.createStatement();
+			 stat.executeUpdate("DROP TABLE books");
+			 stat.executeUpdate("DROP TABLE newbooks");
+			 stat.close();
+			 conn.close();
+		 } catch (Exception e) {
+			 e.printStackTrace();
+			 Assert.fail();
+		 } 
+	}
+	
+	private static void prepareDerbyInputDatabase() throws ClassNotFoundException {
+		try {
+			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+			conn = DriverManager.getConnection(dbURL);
+			createTableBooks();
+			insertDataToSQLTables();
+			conn.close();
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+			Assert.fail();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	private static void prepareDerbyOutputDatabase() throws ClassNotFoundException {
+		try {
+			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+			conn = DriverManager.getConnection(dbURL);
+			createTableNewBooks();
+			conn.close();
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+			Assert.fail();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	private static void createTableBooks() throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+	private static void createTableNewBooks() throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+	private static void insertDataToSQLTables() throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
+		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
+		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
+		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
+		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
+
+		Statement stat = conn.createStatement();
+		stat.execute(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+
+	@After
+	public void tearDown() {
+		jdbcOutputFormat = null;
+		cleanUpDerbyDatabases();
+	}
+
+	@Test
+	public void testJDBCOutputFormat() throws IOException {
+		String sourceTable = "books";
+		String targetTable = "newbooks";
+		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
+		String dbUrl = "jdbc:derby:memory:ebookshop";
+
+		Configuration cfg = new Configuration();
+		cfg.setString("driver", driverPath);
+		cfg.setString("url", dbUrl);
+		cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)");
+		cfg.setInteger("fields", 5);
+		cfg.setClass("type0", IntValue.class);
+		cfg.setClass("type1", StringValue.class);
+		cfg.setClass("type2", StringValue.class);
+		cfg.setClass("type3", FloatValue.class);
+		cfg.setClass("type4", IntValue.class);
+
+		jdbcOutputFormat = new JDBCOutputFormat();
+		jdbcOutputFormat.configure(cfg);
+		jdbcOutputFormat.open(0,1);
+
+		jdbcInputFormat = new JDBCInputFormat(
+				driverPath,
+				dbUrl,
+				"select * from " + sourceTable);
+		jdbcInputFormat.configure(null);
+
+		Record record = new Record();
+		while (!jdbcInputFormat.reachedEnd()) {
+			jdbcInputFormat.nextRecord(record);
+			jdbcOutputFormat.writeRecord(record);
+		}
+
+		jdbcOutputFormat.close();
+		jdbcInputFormat.close();
+
+		jdbcInputFormat = new JDBCInputFormat(
+				driverPath,
+				dbUrl,
+				"select * from " + targetTable);
+		jdbcInputFormat.configure(null);
+
+		int recordCount = 0;
+		while (!jdbcInputFormat.reachedEnd()) {
+			jdbcInputFormat.nextRecord(record);
+			Assert.assertEquals(5, record.getNumFields());
+			Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
+			Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
+			Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
+			Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
+			Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
+
+			int[] pos = {0, 1, 2, 3, 4};
+			Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
+			Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
+
+			recordCount++;
+		}
+		Assert.assertEquals(5, recordCount);
+
+		jdbcInputFormat.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/pom.xml b/flink-addons/flink-spargel/pom.xml
new file mode 100644
index 0000000..5136ad0
--- /dev/null
+++ b/flink-addons/flink-spargel/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<artifactId>flink-addons</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>0.6-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+	
+	<artifactId>flink-spargel</artifactId>
+	<name>flink-spargel</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
new file mode 100644
index 0000000..3e1930c
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
+ * the <i>foreach</i> syntax.
+ */
+public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private transient Iterator<Tuple2<?, Message>> source;
+	
+	
+	final void setSource(Iterator<Tuple2<?, Message>> source) {
+		this.source = source;
+	}
+	
+	@Override
+	public final boolean hasNext() {
+		return this.source.hasNext();
+	}
+	
+	@Override
+	public final Message next() {
+		return this.source.next().f1;
+	}
+
+	@Override
+	public final void remove() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Iterator<Message> iterator() {
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
new file mode 100644
index 0000000..1b5cbde
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.spargel.java.OutgoingEdge;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
+ * 
+ * @param <VertexKey> The type of the vertex key (the vertex identifier).
+ * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EdgeValue> The type of the values that are associated with the edges.
+ */
+public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
+	 * It needs to produce the messages that will be received by vertices in the next superstep.
+	 * 
+	 * @param vertexKey The key of the vertex that was changed.
+	 * @param vertexValue The value (state) of the vertex that was changed.
+	 * 
+	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+	 */
+	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
+	
+	/**
+	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() throws Exception {}
+	
+	/**
+	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() throws Exception {}
+	
+	
+	/**
+	 * Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
+	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
+	 * 
+	 * @return An iterator with all outgoing edges.
+	 */
+	@SuppressWarnings("unchecked")
+	public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
+		}
+		edgesUsed = true;
+		
+		if (this.edgeWithValueIter != null) {
+			this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges);
+			return this.edgeWithValueIter;
+		} else {
+			this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
+			return this.edgeNoValueIter;
+		}
+	}
+	
+	/**
+	 * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
+	 * This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
+	 * 
+	 * @param m The message to send.
+	 */
+	public void sendMessageToAllNeighbors(Message m) {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
+		}
+		
+		edgesUsed = true;
+		
+		outValue.f1 = m;
+		
+		while (edges.hasNext()) {
+			Tuple next = (Tuple) edges.next();
+			VertexKey k = next.getField(1);
+			outValue.f0 = k;
+			out.collect(outValue);
+		}
+	}
+	
+	/**
+	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
+	 * the next superstep will cause an exception due to a non-deliverable message.
+	 * 
+	 * @param target The key (id) of the target vertex to message.
+	 * @param m The message.
+	 */
+	public void sendMessageTo(VertexKey target, Message m) {
+		outValue.f0 = target;
+		outValue.f1 = m;
+		out.collect(outValue);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 * 
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+	
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function. They can be registered via
+	 * {@link VertexCentricIteration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
+	 * 
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  internal methods and state
+	// --------------------------------------------------------------------------------------------
+	
+	private Tuple2<VertexKey, Message> outValue;
+	
+	private IterationRuntimeContext runtimeContext;
+	
+	private Iterator<?> edges;
+	
+	private Collector<Tuple2<VertexKey, Message>> out;
+	
+	private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
+	
+	private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;
+	
+	private boolean edgesUsed;
+	
+	
+	void init(IterationRuntimeContext context, boolean hasEdgeValue) {
+		this.runtimeContext = context;
+		this.outValue = new Tuple2<VertexKey, Message>();
+		
+		if (hasEdgeValue) {
+			this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
+		} else {
+			this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
+		}
+	}
+	
+	void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
+		this.edges = edges;
+		this.out = out;
+		this.edgesUsed = false;
+	}
+	
+	
+	
+	private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> 
+		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
+	{
+		private Iterator<Tuple2<VertexKey, VertexKey>> input;
+		
+		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
+		
+		
+		void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
+			this.input = input;
+		}
+		
+		@Override
+		public boolean hasNext() {
+			return input.hasNext();
+		}
+
+		@Override
+		public OutgoingEdge<VertexKey, EdgeValue> next() {
+			Tuple2<VertexKey, VertexKey> next = input.next();
+			edge.set(next.f1, null);
+			return edge;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
+			return this;
+		}
+	}
+	
+	
+	private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> 
+		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
+	{
+		private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
+		
+		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
+		
+		void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) {
+			this.input = input;
+		}
+		
+		@Override
+		public boolean hasNext() {
+			return input.hasNext();
+		}
+
+		@Override
+		public OutgoingEdge<VertexKey, EdgeValue> next() {
+			Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next();
+			edge.set(next.f1, next.f2);
+			return edge;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+		@Override
+		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
+			return this;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
new file mode 100644
index 0000000..aef9d0b
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+/**
+ * <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
+ * vertex id. Edges may have an associated value (for example a weight or a distance), if the
+ * graph algorithm was initialized with the
+ * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
+ * method.
+ *
+ * @param <VertexKey> The type of the vertex key.
+ * @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold
+ *                    value, this type may be arbitrary.
+ */
+public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private VertexKey target;
+	
+	private EdgeValue edgeValue;
+	
+	void set(VertexKey target, EdgeValue edgeValue) {
+		this.target = target;
+		this.edgeValue = edgeValue;
+	}
+	
+	/**
+	 * Gets the target vertex id.
+	 * 
+	 * @return The target vertex id.
+	 */
+	public VertexKey target() {
+		return target;
+	}
+	
+	/**
+	 * Gets the value associated with the edge. The value may be null if the iteration was initialized with
+	 * an edge data set without edge values.
+	 * Typical examples of edge values are weights or distances of the path represented by the edge.
+	 *  
+	 * @return The value associated with the edge.
+	 */
+	public EdgeValue edgeValue() {
+		return edgeValue;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
new file mode 100644
index 0000000..bb84cea
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.Validate;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.DeltaIteration;
+import org.apache.flink.api.java.functions.CoGroupFunction;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.Collector;
+
+/**
+ * This class represents iterative graph computations, programmed in a vertex-centric perspective.
+ * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
+ * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
+ * <p>
+ * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The 
+ * algorithms send messages along the edges and update the state of vertices based on
+ * the old state and the incoming messages. All vertices have an initial state.
+ * The computation terminates once no vertex updates it state any more.
+ * Additionally, a maximum number of iterations (supersteps) may be specified.
+ * <p>
+ * The computation is here represented by two functions:
+ * <ul>
+ *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
+ *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
+ *   considered updated.</li>
+ *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
+ *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
+ * </ul>
+ * <p>
+ * Vertex-centric graph iterations are instantiated by the
+ * {@link #withPlainEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, or the
+ * {@link #withValuedEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, depending on whether
+ * the graph's edges are carrying values.
+ *
+ * @param <VertexKey> The type of the vertex key (the vertex identifier).
+ * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EdgeValue> The type of the values that are associated with the edges.
+ */
+public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
+	implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+{
+	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
+	
+	private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+	
+	private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
+	
+	private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
+	
+	private final Map<String, Aggregator<?>> aggregators;
+	
+	private final int maximumNumberOfIterations;
+	
+	private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
+	
+	private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
+	
+	private final TypeInformation<Message> messageType;
+	
+	private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
+	
+	private String name;
+	
+	private int parallelism = -1;
+		
+	// ----------------------------------------------------------------------------------
+	
+	private  VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+			DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
+			int maximumNumberOfIterations)
+	{
+		Validate.notNull(uf);
+		Validate.notNull(mf);
+		Validate.notNull(edgesWithoutValue);
+		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+		
+		// check that the edges are actually a valid tuple set of vertex key types
+		TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType();
+		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.");
+		
+		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
+		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
+			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
+			"Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
+		
+		this.updateFunction = uf;
+		this.messagingFunction = mf;
+		this.edgesWithoutValue = edgesWithoutValue;
+		this.edgesWithValue = null;
+		this.maximumNumberOfIterations = maximumNumberOfIterations;
+		this.aggregators = new HashMap<String, Aggregator<?>>();
+		
+		this.messageType = getMessageType(mf);
+	}
+	
+	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+			DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, 
+			int maximumNumberOfIterations,
+			boolean edgeHasValueMarker)
+	{
+		Validate.notNull(uf);
+		Validate.notNull(mf);
+		Validate.notNull(edgesWithValue);
+		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+		
+		// check that the edges are actually a valid tuple set of vertex key types
+		TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType();
+		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.");
+		
+		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
+		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
+			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
+			"The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
+		
+		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+		
+		this.updateFunction = uf;
+		this.messagingFunction = mf;
+		this.edgesWithoutValue = null;
+		this.edgesWithValue = edgesWithValue;
+		this.maximumNumberOfIterations = maximumNumberOfIterations;
+		this.aggregators = new HashMap<String, Aggregator<?>>();
+		
+		this.messageType = getMessageType(mf);
+	}
+	
+	private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
+		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
+	}
+	
+	/**
+	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
+	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
+	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+	 * 
+	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
+	 * @param aggregator The aggregator.
+	 */
+	public void registerAggregator(String name, Aggregator<?> aggregator) {
+		this.aggregators.put(name, aggregator);
+	}
+	
+	/**
+	 * Adds a data set as a broadcast set to the messaging function.
+	 * 
+	 * @param name The name under which the broadcast data is available in the messaging function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Adds a data set as a broadcast set to the vertex update function.
+	 * 
+	 * @param name The name under which the broadcast data is available in the vertex update function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+	
+	/**
+	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
+	 * 
+	 * @param name The name for the iteration.
+	 */
+	public void setName(String name) {
+		this.name = name;
+	}
+	
+	/**
+	 * Gets the name from this vertex-centric iteration.
+	 * 
+	 * @return The name of the iteration.
+	 */
+	public String getName() {
+		return name;
+	}
+	
+	/**
+	 * Sets the degree of parallelism for the iteration.
+	 * 
+	 * @param parallelism The degree of parallelism.
+	 */
+	public void setParallelism(int parallelism) {
+		Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
+		this.parallelism = parallelism;
+	}
+	
+	/**
+	 * Gets the iteration's degree of parallelism.
+	 * 
+	 * @return The iterations parallelism, or -1, if not set.
+	 */
+	public int getParallelism() {
+		return parallelism;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom Operator behavior
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Sets the input data set for this operator. In the case of this operator this input data set represents
+	 * the set of vertices with their initial state.
+	 * 
+	 * @param inputData The input data set, which in the case of this operator represents the set of
+	 *                  vertices with their initial state.
+	 * 
+	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
+	 */
+	@Override
+	public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
+		// sanity check that we really have two tuples
+		TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType();
+		Validate.isTrue(inputType.isTupleType() && inputType.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.");
+
+		// check that the key type here is the same as for the edges
+		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0);
+		TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType() : edgesWithValue.getType();
+		TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0);
+		
+		Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of the input data set (the initial vertices) " +
+				"must be the same data type as the first fields of the edge data set (the source vertex id). " +
+				"Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.", keyType, edgeKeyType);
+
+		this.initialVertices = inputData;
+	}
+	
+	/**
+	 * Creates the operator that represents this vertex-centric graph computation.
+	 * 
+	 * @return The operator that represents this vertex-centric graph computation.
+	 */
+	@Override
+	public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
+		if (this.initialVertices == null) {
+			throw new IllegalStateException("The input data set has not been set.");
+		}
+		
+		// prepare some type information
+		TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
+		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
+		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);		
+		
+		// set up the iteration operator
+		final String name = (this.name != null) ? this.name :
+			"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
+		final int[] zeroKeyPos = new int[] {0};
+	
+		final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> iteration =
+			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
+		iteration.name(name);
+		iteration.parallelism(parallelism);
+		
+		// register all aggregators
+		for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
+			iteration.registerAggregator(entry.getKey(), entry.getValue());
+		}
+		
+		// build the messaging function (co group)
+		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
+		if (edgesWithoutValue != null) {
+			MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message>(messagingFunction, messageTypeInfo);
+			messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
+		}
+		else {
+			MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
+			messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
+		}
+		
+		// configure coGroup message function with name and broadcast variables
+		messages = messages.name("Messaging");
+		for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
+			messages = messages.withBroadcastSet(e.f1, e.f0);
+		}
+		
+		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+		
+		// build the update function (co group)
+		CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
+				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+		
+		// configure coGroup update function with name and broadcast variables
+		updates = updates.name("Vertex State Updates");
+		for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
+			updates = updates.withBroadcastSet(e.f1, e.f0);
+		}
+
+		// let the operator know that we preserve the key field
+		updates.withConstantSetFirst("0").withConstantSetSecond("0");
+		
+		return iteration.closeWith(updates, updates);
+		
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Constructor builders to avoid signature conflicts with generic type erasure
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a new vertex-centric iteration operator for graphs where the edges are not associated with a value.
+	 * 
+	 * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
+	 * @param vertexUpdateFunction The function that updates the state of the vertices from the incoming messages.
+	 * @param messagingFunction The function that turns changed vertex states into messages along the edges.
+	 * 
+	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
+	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
+	 * @param <Message> The type of the message sent between vertices along the edges.
+	 * 
+	 * @return An in stance of the vertex-centric graph computation operator.
+	 */
+	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message>
+			VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(
+					DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
+						VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
+						MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
+						int maximumNumberOfIterations)
+	{
+		@SuppressWarnings("unchecked")
+		MessagingFunction<VertexKey, VertexValue, Message, Object> tmf = 
+								(MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
+		
+		return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations);
+	}
+	
+	/**
+	 * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
+	 * a weight or distance).
+	 * 
+	 * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
+	 * @param uf The function that updates the state of the vertices from the incoming messages.
+	 * @param mf The function that turns changed vertex states into messages along the edges.
+	 * 
+	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
+	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
+	 * @param <Message> The type of the message sent between vertices along the edges.
+	 * @param <EdgeValue> The type of the values that are associated with the edges.
+	 * 
+	 * @return An in stance of the vertex-centric graph computation operator.
+	 */
+	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
+			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(
+					DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
+					VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+					MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+					int maximumNumberOfIterations)
+	{
+		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Wrapping UDFs
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
+		extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+		implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
+
+		private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
+		
+		private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;
+		
+		
+		private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
+				TypeInformation<Tuple2<VertexKey, VertexValue>> resultType)
+		{
+			this.vertexUpdateFunction = vertexUpdateFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void coGroup(Iterator<Tuple2<VertexKey, Message>> messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex,
+				Collector<Tuple2<VertexKey, VertexValue>> out)
+			throws Exception
+		{
+			if (vertex.hasNext()) {
+				Tuple2<VertexKey, VertexValue> vertexState = vertex.next();
+				
+				@SuppressWarnings("unchecked")
+				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages;
+				messageIter.setSource(downcastIter);
+				
+				vertexUpdateFunction.setOutput(vertexState, out);
+				vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
+			} else {
+				if (messages.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Tuple2<VertexKey, Message> next = messages.next();
+						message = "Target vertex '" + next.f0 + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.vertexUpdateFunction.init(getIterationRuntimeContext());
+			}
+			this.vertexUpdateFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.vertexUpdateFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
+			return this.resultType;
+		}
+	}
+	
+	/*
+	 * UDF that encapsulates the message sending function for graphs where the edges have no associated values.
+	 */
+	private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
+		extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
+		
+		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
+		
+		
+		private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
+				TypeInformation<Tuple2<VertexKey, Message>> resultType)
+		{
+			this.messagingFunction = messagingFunction;
+			this.resultType = resultType;
+		}
+		
+		@Override
+		public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> edges,
+				Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+			throws Exception
+		{
+			if (state.hasNext()) {
+				Tuple2<VertexKey, VertexValue> newVertexState = state.next();
+				messagingFunction.set((Iterator<?>) edges, out);
+				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
+			}
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.messagingFunction.init(getIterationRuntimeContext(), false);
+			}
+			
+			this.messagingFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.messagingFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
+			return this.resultType;
+		}
+	}
+	
+	/*
+	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
+	 */
+	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
+		extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+		
+		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
+		
+		
+		private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
+				TypeInformation<Tuple2<VertexKey, Message>> resultType)
+		{
+			this.messagingFunction = messagingFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
+				Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
+			throws Exception
+		{
+			if (state.hasNext()) {
+				Tuple2<VertexKey, VertexValue> newVertexState = state.next();
+				messagingFunction.set((Iterator<?>) edges, out);
+				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
+			}
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.messagingFunction.init(getIterationRuntimeContext(), true);
+			}
+			
+			this.messagingFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.messagingFunction.postSuperstep();
+		}
+		
+		@Override
+		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
+			return this.resultType;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
new file mode 100644
index 0000000..c072754
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * This class must be extended by functions that compute the state of the vertex depending on the old state and the
+ * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
+ * invoked once per vertex per superstep.
+ * 
+ * <VertexKey> The vertex key type.
+ * <VertexValue> The vertex value type.
+ * <Message> The message type.
+ */
+public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
+	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
+	 * 
+	 * @param vertexKey The key (identifier) of the vertex.
+	 * @param vertexValue The value (state) of the vertex.
+	 * @param inMessages The incoming messages to this vertex.
+	 * 
+	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+	 */
+	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
+	
+	/**
+	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() throws Exception {}
+	
+	/**
+	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() throws Exception {}
+	
+	/**
+	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+	 * 
+	 * @param newValue The new vertex value.
+	 */
+	public void setNewVertexValue(VertexValue newValue) {
+		outVal.f1 = newValue;
+		out.collect(outVal);
+	}
+	
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 * 
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+	
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function. They can be registered via
+	 * {@link VertexCentricIteration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
+	 * 
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  internal methods
+	// --------------------------------------------------------------------------------------------
+	
+	private IterationRuntimeContext runtimeContext;
+	
+	private Collector<Tuple2<VertexKey, VertexValue>> out;
+	
+	private Tuple2<VertexKey, VertexValue> outVal;
+	
+	
+	void init(IterationRuntimeContext context) {
+		this.runtimeContext = context;
+	}
+	
+	void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey, VertexValue>> out) {
+		this.out = out;
+		this.outVal = val;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
new file mode 100644
index 0000000..ea90feb
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class SpargelConnectedComponents {
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Long> vertexIds = env.generateSequence(0, 10);
+		DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(0L, 2L), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Long, Long>(4L, 8L),
+															new Tuple2<Long, Long>(1L, 5L), new Tuple2<Long, Long>(3L, 7L), new Tuple2<Long, Long>(3L, 9L));
+		
+		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+		
+		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+		
+		result.print();
+		env.execute("Spargel Connected Components");
+	}
+	
+	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
+		@Override
+		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+			long min = Long.MAX_VALUE;
+			for (long msg : inMessages) {
+				min = Math.min(min, msg);
+			}
+			if (min < vertexValue) {
+				setNewVertexValue(min);
+			}
+		}
+	}
+	
+	public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> {
+		@Override
+		public void sendMessages(Long vertexId, Long componentId) {
+			sendMessageToAllNeighbors(componentId);
+		}
+	}
+	
+	/**
+	 * A map function that takes a Long value and creates a 2-tuple out of it:
+	 * <pre>(Long value) -> (value, value)</pre>
+	 */
+	public static final class IdAssigner extends MapFunction<Long, Tuple2<Long, Long>> {
+		@Override
+		public Tuple2<Long, Long> map(Long value) {
+			return new Tuple2<Long, Long>(value, value);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
new file mode 100644
index 0000000..c7fbaaa
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.OutgoingEdge;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
+ * In this implementation, the edges carry a weight (the transition probability).
+ */
+@SuppressWarnings("serial")
+public class SpargelPageRank {
+	
+	private static final double BETA = 0.85;
+
+	
+	public static void main(String[] args) throws Exception {
+		final int numVertices = 100;
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// enumerate some sample edges and assign an initial uniform probability (rank)
+		DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
+								.map(new MapFunction<Long, Tuple2<Long, Double>>() {
+									public Tuple2<Long, Double> map(Long value) {
+										return new Tuple2<Long, Double>(value, 1.0/numVertices);
+									}
+								});
+		
+		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
+		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
+								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
+										int numOutEdges = (int) (Math.random() * (numVertices / 2));
+										for (int i = 0; i < numOutEdges; i++) {
+											long target = (long) (Math.random() * numVertices) + 1;
+											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
+										}
+									}
+								});
+		
+		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(
+			VertexCentricIteration.withValuedEdges(edgesWithProbability,
+						new VertexRankUpdater(numVertices, BETA), new RankMessenger(), 20));
+		
+		result.print();
+		env.execute("Spargel PageRank");
+	}
+	
+	/**
+	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
+	 * and then applying the dampening formula.
+	 */
+	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
+		
+		private final long numVertices;
+		private final double beta;
+		
+		public VertexRankUpdater(long numVertices, double beta) {
+			this.numVertices = numVertices;
+			this.beta = beta;
+		}
+
+		@Override
+		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
+			double rankSum = 0.0;
+			for (double msg : inMessages) {
+				rankSum += msg;
+			}
+			
+			// apply the dampening factor / random jump
+			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
+			setNewVertexValue(newRank);
+		}
+	}
+	
+	/**
+	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
+	 * which is associated with an edge as the edge value.
+	 */
+	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
+		
+		@Override
+		public void sendMessages(Long vertexId, Double newRank) {
+			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.target(), newRank * edge.edgeValue());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
new file mode 100644
index 0000000..34c9ad8
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.examples;
+
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.OutgoingEdge;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
+ * In this implementation, the edges carry a weight (the transition probability).
+ */
+@SuppressWarnings("serial")
+public class SpargelPageRankCountingVertices {
+	
+	private static final double BETA = 0.85;
+
+	
+	public static void main(String[] args) throws Exception {
+		final int NUM_VERTICES = 100;
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// a list of vertices
+		DataSet<Long> vertices = env.generateSequence(1, NUM_VERTICES);
+		
+		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
+		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
+								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
+										int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
+										for (int i = 0; i < numOutEdges; i++) {
+											long target = (long) (Math.random() * NUM_VERTICES) + 1;
+											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
+										}
+									}
+								});
+		
+		// ---------- start of the algorithm ---------------
+		
+		// count the number of vertices
+		DataSet<Long> count = vertices
+			.map(new MapFunction<Long, Long>() {
+				public Long map(Long value) {
+					return 1L;
+				}
+			})
+			.reduce(new ReduceFunction<Long>() {
+				public Long reduce(Long value1, Long value2) {
+					return value1 + value2;
+				}
+			});
+		
+		// enumerate some sample edges and assign an initial uniform probability (rank)
+		DataSet<Tuple2<Long, Double>> intialRanks = vertices
+			.map(new MapFunction<Long, Tuple2<Long, Double>>() {
+				
+				private long numVertices;
+				
+				@Override
+				public void open(Configuration parameters) {
+					numVertices = getRuntimeContext().<Long>getBroadcastVariable("count").iterator().next();
+				}
+				
+				public Tuple2<Long, Double> map(Long value) {
+					return new Tuple2<Long, Double>(value, 1.0/numVertices);
+				}
+			}).withBroadcastSet(count, "count");
+		
+
+		VertexCentricIteration<Long, Double, Double, Double> iteration = VertexCentricIteration.withValuedEdges(edgesWithProbability,
+				new VertexRankUpdater(BETA), new RankMessenger(), 20);
+		iteration.addBroadcastSetForUpdateFunction("count", count);
+		
+		
+		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(iteration);
+		
+		result.print();
+		env.execute("Spargel PageRank");
+	}
+	
+	/**
+	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
+	 * and then applying the dampening formula.
+	 */
+	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
+		
+		private final double beta;
+		private long numVertices;
+		
+		public VertexRankUpdater(double beta) {
+			this.beta = beta;
+		}
+		
+		@Override
+		public void preSuperstep() {
+			numVertices = this.<Long>getBroadcastSet("count").iterator().next();
+		}
+
+		@Override
+		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
+			double rankSum = 0.0;
+			for (double msg : inMessages) {
+				rankSum += msg;
+			}
+			
+			// apply the dampening factor / random jump
+			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
+			setNewVertexValue(newRank);
+		}
+	}
+	
+	/**
+	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
+	 * which is associated with an edge as the edge value.
+	 */
+	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
+		
+		@Override
+		public void sendMessages(Long vertexId, Double newRank) {
+			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.target(), newRank * edge.edgeValue());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
new file mode 100644
index 0000000..ab29471
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Value;
+
+
+public final class Edge<VertexKey extends Key<VertexKey>, EdgeValue extends Value> {
+	
+	private VertexKey target;
+	private EdgeValue edgeValue;
+	
+	void set(VertexKey target, EdgeValue edgeValue) {
+		this.target = target;
+		this.edgeValue = edgeValue;
+	}
+	
+	public VertexKey target() {
+		return target;
+	}
+	
+	public EdgeValue edgeValue() {
+		return edgeValue;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
new file mode 100644
index 0000000..25ad748
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.util.Iterator;
+
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+
+public final class MessageIterator<Message extends Value> implements Iterator<Message>, Iterable<Message> {
+
+	private final Message instance;
+	private Iterator<Record> source;
+	
+	public MessageIterator(Message instance) {
+		this.instance = instance;
+	}
+	
+	public final void setSource(Iterator<Record> source) {
+		this.source = source;
+	}
+	
+	@Override
+	public final boolean hasNext() {
+		return this.source.hasNext();
+	}
+	
+	@Override
+	public final Message next() {
+		this.source.next().getFieldInto(1, this.instance);
+		return this.instance;
+	}
+
+	@Override
+	public final void remove() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Iterator<Message> iterator() {
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
new file mode 100644
index 0000000..026b366
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+public abstract class MessagingFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> implements Serializable {
+
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+	
+	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
+	
+	public void setup(Configuration config) throws Exception {}
+	
+	public void preSuperstep() throws Exception {}
+	
+	public void postSuperstep() throws Exception {}
+	
+	
+	public Iterator<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
+		}
+		
+		edgesUsed = true;
+		edgeIter.set(edges);
+		return edgeIter;
+	}
+	
+	public void sendMessageToAllNeighbors(Message m) {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
+		}
+		
+		edgesUsed = true;
+		while (edges.hasNext()) {
+			Record next = edges.next();
+			VertexKey k = next.getField(1, this.keyClass);
+			outValue.setField(0, k);
+			outValue.setField(1, m);
+			out.collect(outValue);
+		}
+	}
+	
+	public void sendMessageTo(VertexKey target, Message m) {
+		outValue.setField(0, target);
+		outValue.setField(1, m);
+		out.collect(outValue);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public int getSuperstep() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  internal methods and state
+	// --------------------------------------------------------------------------------------------
+	
+	private Record outValue;
+	
+	private IterationRuntimeContext runtimeContext;
+	
+	private Iterator<Record> edges;
+	
+	private Collector<Record> out;
+	
+	private EdgesIterator<VertexKey, EdgeValue> edgeIter;
+	
+	private Class<VertexKey> keyClass;
+	
+	private boolean edgesUsed;
+	
+	
+	@SuppressWarnings("unchecked")
+	void init(IterationRuntimeContext context, VertexKey keyHolder, EdgeValue edgeValueHolder) {
+		this.runtimeContext = context;
+		this.edgeIter = new EdgesIterator<VertexKey, EdgeValue>(keyHolder, edgeValueHolder);
+		this.outValue = new Record();
+		this.keyClass = (Class<VertexKey>) keyHolder.getClass();
+	}
+	
+	void set(Iterator<Record> edges, Collector<Record> out) {
+		this.edges = edges;
+		this.out = out;
+		this.edgesUsed = false;
+	}
+	
+	private static final long serialVersionUID = 1L;
+	
+	private static final class EdgesIterator<VertexKey extends Key<VertexKey>, EdgeValue extends Value> implements Iterator<Edge<VertexKey, EdgeValue>> {
+
+		private Iterator<Record> input;
+		private VertexKey keyHolder;
+		private EdgeValue edgeValueHolder;
+		
+		private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
+		
+		EdgesIterator(VertexKey keyHolder, EdgeValue edgeValueHolder) {
+			this.keyHolder = keyHolder;
+			this.edgeValueHolder = edgeValueHolder;
+		}
+		
+		void set(Iterator<Record> input) {
+			this.input = input;
+		}
+		
+		@Override
+		public boolean hasNext() {
+			return input.hasNext();
+		}
+
+		@Override
+		public Edge<VertexKey, EdgeValue> next() {
+			Record next = input.next();
+			next.getFieldInto(0, keyHolder);
+			next.getFieldInto(1, edgeValueHolder);
+			edge.set(keyHolder, edgeValueHolder);
+			return edge;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+}


Mime
View raw message