flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [45/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:23 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index c824ea1..0000000
--- a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
-	private JDBCInputFormat jdbcInputFormat;
-	private JDBCOutputFormat jdbcOutputFormat;
-
-	private static Connection conn;
-
-	static final Value[][] dbData = {
-		{new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
-		{new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
-		{new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
-		{new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
-		{new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
-	@BeforeClass
-	public static void setUpClass() {
-		try {
-			System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
-			prepareDerbyInputDatabase();
-			prepareDerbyOutputDatabase();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		 try {
-			 String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			 Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			 conn = DriverManager.getConnection(dbURL);
-			 Statement stat = conn.createStatement();
-			 stat.executeUpdate("DROP TABLE books");
-			 stat.executeUpdate("DROP TABLE newbooks");
-			 stat.close();
-			 conn.close();
-		 } catch (Exception e) {
-			 e.printStackTrace();
-			 Assert.fail();
-		 } 
-	}
-	
-	private static void prepareDerbyInputDatabase() throws ClassNotFoundException {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			conn = DriverManager.getConnection(dbURL);
-			createTableBooks();
-			insertDataToSQLTables();
-			conn.close();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		} catch (SQLException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyOutputDatabase() throws ClassNotFoundException {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-			conn = DriverManager.getConnection(dbURL);
-			createTableNewBooks();
-			conn.close();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		} catch (SQLException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void createTableBooks() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void createTableNewBooks() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTables() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-
-	@After
-	public void tearDown() {
-		jdbcOutputFormat = null;
-		cleanUpDerbyDatabases();
-	}
-
-	@Test
-	public void testJDBCOutputFormat() throws IOException {
-		String sourceTable = "books";
-		String targetTable = "newbooks";
-		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
-		String dbUrl = "jdbc:derby:memory:ebookshop";
-
-		Configuration cfg = new Configuration();
-		cfg.setString("driver", driverPath);
-		cfg.setString("url", dbUrl);
-		cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)");
-		cfg.setInteger("fields", 5);
-		cfg.setClass("type0", IntValue.class);
-		cfg.setClass("type1", StringValue.class);
-		cfg.setClass("type2", StringValue.class);
-		cfg.setClass("type3", FloatValue.class);
-		cfg.setClass("type4", IntValue.class);
-
-		jdbcOutputFormat = new JDBCOutputFormat();
-		jdbcOutputFormat.configure(cfg);
-		jdbcOutputFormat.open(0,1);
-
-		jdbcInputFormat = new JDBCInputFormat(
-				driverPath,
-				dbUrl,
-				"select * from " + sourceTable);
-		jdbcInputFormat.configure(null);
-
-		Record record = new Record();
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(record);
-			jdbcOutputFormat.writeRecord(record);
-		}
-
-		jdbcOutputFormat.close();
-		jdbcInputFormat.close();
-
-		jdbcInputFormat = new JDBCInputFormat(
-				driverPath,
-				dbUrl,
-				"select * from " + targetTable);
-		jdbcInputFormat.configure(null);
-
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(record);
-			Assert.assertEquals(5, record.getNumFields());
-			Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
-			Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
-			Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
-			Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
-			Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
-			int[] pos = {0, 1, 2, 3, 4};
-			Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
-			Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
-			recordCount++;
-		}
-		Assert.assertEquals(5, recordCount);
-
-		jdbcInputFormat.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/resources/log4j-test.properties b/flink-addons/flink-jdbc/src/test/resources/log4j-test.properties
deleted file mode 100644
index 2fb9345..0000000
--- a/flink-addons/flink-jdbc/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/resources/logback-test.xml b/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/pom.xml b/flink-addons/flink-spargel/pom.xml
deleted file mode 100644
index 863e66e..0000000
--- a/flink-addons/flink-spargel/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-addons</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	
-	<artifactId>flink-spargel</artifactId>
-	<name>flink-spargel</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
deleted file mode 100644
index 535732d..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessageIterator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
- * the <i>foreach</i> syntax.
- */
-public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private transient Iterator<Tuple2<?, Message>> source;
-	
-	
-	final void setSource(Iterator<Tuple2<?, Message>> source) {
-		this.source = source;
-	}
-	
-	@Override
-	public final boolean hasNext() {
-		return this.source.hasNext();
-	}
-	
-	@Override
-	public final Message next() {
-		return this.source.next().f1;
-	}
-
-	@Override
-	public final void remove() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Iterator<Message> iterator() {
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
deleted file mode 100644
index 41c2720..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/MessagingFunction.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
- * 
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- */
-public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
-	 * It needs to produce the messages that will be received by vertices in the next superstep.
-	 * 
-	 * @param vertexKey The key of the vertex that was changed.
-	 * @param vertexValue The value (state) of the vertex that was changed.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() throws Exception {}
-	
-	
-	/**
-	 * Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
-	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
-	 * 
-	 * @return An iterator with all outgoing edges.
-	 */
-	@SuppressWarnings("unchecked")
-	public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
-		}
-		edgesUsed = true;
-		
-		if (this.edgeWithValueIter != null) {
-			this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges);
-			return this.edgeWithValueIter;
-		} else {
-			this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
-			return this.edgeNoValueIter;
-		}
-	}
-	
-	/**
-	 * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
-	 * This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
-	 * 
-	 * @param m The message to send.
-	 */
-	public void sendMessageToAllNeighbors(Message m) {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
-		}
-		
-		edgesUsed = true;
-		
-		outValue.f1 = m;
-		
-		while (edges.hasNext()) {
-			Tuple next = (Tuple) edges.next();
-			VertexKey k = next.getField(1);
-			outValue.f0 = k;
-			out.collect(outValue);
-		}
-	}
-	
-	/**
-	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
-	 * the next superstep will cause an exception due to a non-deliverable message.
-	 * 
-	 * @param target The key (id) of the target vertex to message.
-	 * @param m The message.
-	 */
-	public void sendMessageTo(VertexKey target, Message m) {
-		outValue.f0 = target;
-		outValue.f1 = m;
-		out.collect(outValue);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link VertexCentricIteration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  internal methods and state
-	// --------------------------------------------------------------------------------------------
-	
-	private Tuple2<VertexKey, Message> outValue;
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Iterator<?> edges;
-	
-	private Collector<Tuple2<VertexKey, Message>> out;
-	
-	private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
-	
-	private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;
-	
-	private boolean edgesUsed;
-	
-	
-	void init(IterationRuntimeContext context, boolean hasEdgeValue) {
-		this.runtimeContext = context;
-		this.outValue = new Tuple2<VertexKey, Message>();
-		
-		if (hasEdgeValue) {
-			this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
-		} else {
-			this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
-		}
-	}
-	
-	void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
-		this.edges = edges;
-		this.out = out;
-		this.edgesUsed = false;
-	}
-	
-	
-	
-	private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> 
-		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
-	{
-		private Iterator<Tuple2<VertexKey, VertexKey>> input;
-		
-		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
-		
-		
-		void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public OutgoingEdge<VertexKey, EdgeValue> next() {
-			Tuple2<VertexKey, VertexKey> next = input.next();
-			edge.set(next.f1, null);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
-			return this;
-		}
-	}
-	
-	
-	private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> 
-		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
-	{
-		private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
-		
-		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();
-		
-		void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public OutgoingEdge<VertexKey, EdgeValue> next() {
-			Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next();
-			edge.set(next.f1, next.f2);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-		@Override
-		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
-			return this;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
deleted file mode 100644
index 6d97525..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/OutgoingEdge.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-/**
- * <tt>Edge</tt> objects represent edges between vertices. Edges are defined by their source and target
- * vertex id. Edges may have an associated value (for example a weight or a distance), if the
- * graph algorithm was initialized with the
- * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction, MessagingFunction, int)}
- * method.
- *
- * @param <VertexKey> The type of the vertex key.
- * @param <EdgeValue> The type of the value associated with the edge. For scenarios where the edges do not hold
- *                    value, this type may be arbitrary.
- */
-public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue> implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private VertexKey target;
-	
-	private EdgeValue edgeValue;
-	
-	void set(VertexKey target, EdgeValue edgeValue) {
-		this.target = target;
-		this.edgeValue = edgeValue;
-	}
-	
-	/**
-	 * Gets the target vertex id.
-	 * 
-	 * @return The target vertex id.
-	 */
-	public VertexKey target() {
-		return target;
-	}
-	
-	/**
-	 * Gets the value associated with the edge. The value may be null if the iteration was initialized with
-	 * an edge data set without edge values.
-	 * Typical examples of edge values are weights or distances of the path represented by the edge.
-	 *  
-	 * @return The value associated with the edge.
-	 */
-	public EdgeValue edgeValue() {
-		return edgeValue;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
deleted file mode 100644
index 4f84467..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-
-/**
- * This class represents iterative graph computations, programmed in a vertex-centric perspective.
- * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
- * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
- * <p>
- * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The 
- * algorithms send messages along the edges and update the state of vertices based on
- * the old state and the incoming messages. All vertices have an initial state.
- * The computation terminates once no vertex updates it state any more.
- * Additionally, a maximum number of iterations (supersteps) may be specified.
- * <p>
- * The computation is here represented by two functions:
- * <ul>
- *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
- *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
- *   considered updated.</li>
- *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
- *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
- * </ul>
- * <p>
- * Vertex-centric graph iterations are instantiated by the
- * {@link #withPlainEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, or the
- * {@link #withValuedEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, depending on whether
- * the graph's edges are carrying values.
- *
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
- */
-public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-	implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
-{
-	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
-	
-	private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
-	
-	private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
-	
-	private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
-	
-	private final Map<String, Aggregator<?>> aggregators;
-	
-	private final int maximumNumberOfIterations;
-	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
-	private final TypeInformation<Message> messageType;
-	
-	private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
-	
-	private String name;
-	
-	private int parallelism = -1;
-	
-	private boolean unmanagedSolutionSet;
-	
-	// ----------------------------------------------------------------------------------
-	
-	private  VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-			int maximumNumberOfIterations)
-	{
-		Validate.notNull(uf);
-		Validate.notNull(mf);
-		Validate.notNull(edgesWithoutValue);
-		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		// check that the edges are actually a valid tuple set of vertex key types
-		TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType();
-		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.");
-		
-		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-			"Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
-		
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
-		this.edgesWithoutValue = edgesWithoutValue;
-		this.edgesWithValue = null;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
-		this.messageType = getMessageType(mf);
-	}
-	
-	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, 
-			int maximumNumberOfIterations,
-			boolean edgeHasValueMarker)
-	{
-		Validate.notNull(uf);
-		Validate.notNull(mf);
-		Validate.notNull(edgesWithValue);
-		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		// check that the edges are actually a valid tuple set of vertex key types
-		TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType();
-		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.");
-		
-		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-			"The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.");
-		
-		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-		
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
-		this.edgesWithoutValue = null;
-		this.edgesWithValue = edgesWithValue;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
-		this.messageType = getMessageType(mf);
-	}
-	
-	private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
-		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
-	}
-	
-	/**
-	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
-	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
-	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
-	 * 
-	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
-	 * @param aggregator The aggregator.
-	 */
-	public void registerAggregator(String name, Aggregator<?> aggregator) {
-		this.aggregators.put(name, aggregator);
-	}
-	
-	/**
-	 * Adds a data set as a broadcast set to the messaging function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the messaging function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
-		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Adds a data set as a broadcast set to the vertex update function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the vertex update function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
-		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-	
-	/**
-	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
-	 * 
-	 * @param name The name for the iteration.
-	 */
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	/**
-	 * Gets the name from this vertex-centric iteration.
-	 * 
-	 * @return The name of the iteration.
-	 */
-	public String getName() {
-		return name;
-	}
-	
-	/**
-	 * Sets the degree of parallelism for the iteration.
-	 * 
-	 * @param parallelism The degree of parallelism.
-	 */
-	public void setParallelism(int parallelism) {
-		Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
-		this.parallelism = parallelism;
-	}
-	
-	/**
-	 * Gets the iteration's degree of parallelism.
-	 * 
-	 * @return The iterations parallelism, or -1, if not set.
-	 */
-	public int getParallelism() {
-		return parallelism;
-	}
-	
-	/**
-	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
-	 * in serialized form) or as a simple object map.
-	 * By default, the solution set runs in managed memory.
-	 * 
-	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
-	 */
-	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
-		this.unmanagedSolutionSet = unmanaged;
-	}
-	
-	/**
-	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
-	 * in serialized form) or as a simple object map.
-	 * By default, the solution set runs in managed memory.
-	 * 
-	 * @return True, if the solution set is in unmanaged memory, false otherwise.
-	 */
-	public boolean isSolutionSetUnmanagedMemory() {
-		return this.unmanagedSolutionSet;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom Operator behavior
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Sets the input data set for this operator. In the case of this operator this input data set represents
-	 * the set of vertices with their initial state.
-	 * 
-	 * @param inputData The input data set, which in the case of this operator represents the set of
-	 *                  vertices with their initial state.
-	 * 
-	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
-	 */
-	@Override
-	public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
-		// sanity check that we really have two tuples
-		TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType();
-		Validate.isTrue(inputType.isTupleType() && inputType.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.");
-
-		// check that the key type here is the same as for the edges
-		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0);
-		TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType() : edgesWithValue.getType();
-		TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0);
-		
-		Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of the input data set (the initial vertices) " +
-				"must be the same data type as the first fields of the edge data set (the source vertex id). " +
-				"Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.", keyType, edgeKeyType);
-
-		this.initialVertices = inputData;
-	}
-	
-	/**
-	 * Creates the operator that represents this vertex-centric graph computation.
-	 * 
-	 * @return The operator that represents this vertex-centric graph computation.
-	 */
-	@Override
-	public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
-		if (this.initialVertices == null) {
-			throw new IllegalStateException("The input data set has not been set.");
-		}
-		
-		// prepare some type information
-		TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
-		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
-		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);		
-		
-		// set up the iteration operator
-		final String name = (this.name != null) ? this.name :
-			"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
-		final int[] zeroKeyPos = new int[] {0};
-	
-		final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> iteration =
-			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
-		iteration.name(name);
-		iteration.parallelism(parallelism);
-		iteration.setSolutionSetUnManaged(unmanagedSolutionSet);
-		
-		// register all aggregators
-		for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
-			iteration.registerAggregator(entry.getKey(), entry.getValue());
-		}
-		
-		// build the messaging function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
-		if (edgesWithoutValue != null) {
-			MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message>(messagingFunction, messageTypeInfo);
-			messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		}
-		else {
-			MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
-			messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		}
-		
-		// configure coGroup message function with name and broadcast variables
-		messages = messages.name("Messaging");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
-			messages = messages.withBroadcastSet(e.f1, e.f0);
-		}
-		
-		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
-		
-		// build the update function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
-				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-		
-		// configure coGroup update function with name and broadcast variables
-		updates = updates.name("Vertex State Updates");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
-			updates = updates.withBroadcastSet(e.f1, e.f0);
-		}
-
-		// let the operator know that we preserve the key field
-		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-		
-		return iteration.closeWith(updates, updates);
-		
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// Constructor builders to avoid signature conflicts with generic type erasure
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new vertex-centric iteration operator for graphs where the edges are not associated with a value.
-	 * 
-	 * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
-	 * @param vertexUpdateFunction The function that updates the state of the vertices from the incoming messages.
-	 * @param messagingFunction The function that turns changed vertex states into messages along the edges.
-	 * 
-	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
-	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
-	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * 
-	 * @return An in stance of the vertex-centric graph computation operator.
-	 */
-	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message>
-			VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(
-					DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-						VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-						MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
-						int maximumNumberOfIterations)
-	{
-		@SuppressWarnings("unchecked")
-		MessagingFunction<VertexKey, VertexValue, Message, Object> tmf = 
-								(MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
-		
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations);
-	}
-	
-	/**
-	 * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
-	 * a weight or distance).
-	 * 
-	 * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples: (source-id, target-id)
-	 * @param uf The function that updates the state of the vertices from the incoming messages.
-	 * @param mf The function that turns changed vertex states into messages along the edges.
-	 * 
-	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
-	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
-	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * @param <EdgeValue> The type of the values that are associated with the edges.
-	 * 
-	 * @return An in stance of the vertex-centric graph computation operator.
-	 */
-	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
-			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(
-					DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
-					VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-					MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-					int maximumNumberOfIterations)
-	{
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Wrapping UDFs
-	// --------------------------------------------------------------------------------------------
-	
-	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
-		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
-
-		private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
-		
-		private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;
-		
-		
-		private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-				TypeInformation<Tuple2<VertexKey, VertexValue>> resultType)
-		{
-			this.vertexUpdateFunction = vertexUpdateFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex,
-				Collector<Tuple2<VertexKey, VertexValue>> out)
-			throws Exception
-		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> vertexIter = vertex.iterator();
-			
-			if (vertexIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> vertexState = vertexIter.next();
-				
-				@SuppressWarnings("unchecked")
-				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
-				messageIter.setSource(downcastIter);
-				
-				vertexUpdateFunction.setOutput(vertexState, out);
-				vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
-			}
-			else {
-				final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
-				if (messageIter.hasNext()) {
-					String message = "Target vertex does not exist!.";
-					try {
-						Tuple2<VertexKey, Message> next = messageIter.next();
-						message = "Target vertex '" + next.f0 + "' does not exist!.";
-					} catch (Throwable t) {}
-					throw new Exception(message);
-				} else {
-					throw new Exception();
-				}
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.vertexUpdateFunction.init(getIterationRuntimeContext());
-			}
-			this.vertexUpdateFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.vertexUpdateFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
-			return this.resultType;
-		}
-	}
-	
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have no associated values.
-	 */
-	private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
-		extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-		
-		
-		private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-		
-		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> edges,
-				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
-			throws Exception
-		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
-			
-			if (stateIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext(), false);
-			}
-			
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-	
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
-	 */
-	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-		extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-		
-		
-		private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
-				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
-			throws Exception
-		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
-			
-			if (stateIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext(), true);
-			}
-			
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-		
-		@Override
-		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
deleted file mode 100644
index a5548b7..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This class must be extended by functions that compute the state of the vertex depending on the old state and the
- * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
- * invoked once per vertex per superstep.
- * 
- * <VertexKey> The vertex key type.
- * <VertexValue> The vertex value type.
- * <Message> The message type.
- */
-public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
-	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
-	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
-	 * 
-	 * @param vertexKey The key (identifier) of the vertex.
-	 * @param vertexValue The value (state) of the vertex.
-	 * @param inMessages The incoming messages to this vertex.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() throws Exception {}
-	
-	/**
-	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
-	 * 
-	 * @param newValue The new vertex value.
-	 */
-	public void setNewVertexValue(VertexValue newValue) {
-		outVal.f1 = newValue;
-		out.collect(outVal);
-	}
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link VertexCentricIteration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  internal methods
-	// --------------------------------------------------------------------------------------------
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Collector<Tuple2<VertexKey, VertexValue>> out;
-	
-	private Tuple2<VertexKey, VertexValue> outVal;
-	
-	
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-	}
-	
-	void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey, VertexValue>> out) {
-		this.out = out;
-		this.outVal = val;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
deleted file mode 100644
index 7574dab..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class SpargelConnectedComponents {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<Long> vertexIds = env.generateSequence(0, 10);
-		DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(0L, 2L), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Long, Long>(4L, 8L),
-															new Tuple2<Long, Long>(1L, 5L), new Tuple2<Long, Long>(3L, 7L), new Tuple2<Long, Long>(3L, 9L));
-		
-		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-		
-		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
-		
-		result.print();
-		env.execute("Spargel Connected Components");
-	}
-	
-	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
-		@Override
-		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
-			long min = Long.MAX_VALUE;
-			for (long msg : inMessages) {
-				min = Math.min(min, msg);
-			}
-			if (min < vertexValue) {
-				setNewVertexValue(min);
-			}
-		}
-	}
-	
-	public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> {
-		@Override
-		public void sendMessages(Long vertexId, Long componentId) {
-			sendMessageToAllNeighbors(componentId);
-		}
-	}
-	
-	/**
-	 * A map function that takes a Long value and creates a 2-tuple out of it:
-	 * <pre>(Long value) -> (value, value)</pre>
-	 */
-	public static final class IdAssigner implements MapFunction<Long, Tuple2<Long, Long>> {
-		@Override
-		public Tuple2<Long, Long> map(Long value) {
-			return new Tuple2<Long, Long>(value, value);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
deleted file mode 100644
index fccb195..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
- * In this implementation, the edges carry a weight (the transition probability).
- */
-@SuppressWarnings("serial")
-public class SpargelPageRank {
-	
-	private static final double BETA = 0.85;
-
-	
-	public static void main(String[] args) throws Exception {
-		final int numVertices = 100;
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// enumerate some sample edges and assign an initial uniform probability (rank)
-		DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
-								.map(new MapFunction<Long, Tuple2<Long, Double>>() {
-									public Tuple2<Long, Double> map(Long value) {
-										return new Tuple2<Long, Double>(value, 1.0/numVertices);
-									}
-								});
-		
-		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
-		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
-								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
-									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
-										int numOutEdges = (int) (Math.random() * (numVertices / 2));
-										for (int i = 0; i < numOutEdges; i++) {
-											long target = (long) (Math.random() * numVertices) + 1;
-											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
-										}
-									}
-								});
-		
-		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(
-			VertexCentricIteration.withValuedEdges(edgesWithProbability,
-						new VertexRankUpdater(numVertices, BETA), new RankMessenger(), 20));
-		
-		result.print();
-		env.execute("Spargel PageRank");
-	}
-	
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
-	 * and then applying the dampening formula.
-	 */
-	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
-		
-		private final long numVertices;
-		private final double beta;
-		
-		public VertexRankUpdater(long numVertices, double beta) {
-			this.numVertices = numVertices;
-			this.beta = beta;
-		}
-
-		@Override
-		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-			
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
-			setNewVertexValue(newRank);
-		}
-	}
-	
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
-	 * which is associated with an edge as the edge value.
-	 */
-	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
-		
-		@Override
-		public void sendMessages(Long vertexId, Double newRank) {
-			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.target(), newRank * edge.edgeValue());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
deleted file mode 100644
index ae4ee95..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
- * In this implementation, the edges carry a weight (the transition probability).
- */
-@SuppressWarnings("serial")
-public class SpargelPageRankCountingVertices {
-	
-	private static final double BETA = 0.85;
-
-	
-	public static void main(String[] args) throws Exception {
-		final int NUM_VERTICES = 100;
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// a list of vertices
-		DataSet<Long> vertices = env.generateSequence(1, NUM_VERTICES);
-		
-		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
-		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
-								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
-									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
-										int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
-										for (int i = 0; i < numOutEdges; i++) {
-											long target = (long) (Math.random() * NUM_VERTICES) + 1;
-											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
-										}
-									}
-								});
-		
-		// ---------- start of the algorithm ---------------
-		
-		// count the number of vertices
-		DataSet<Long> count = vertices
-			.map(new MapFunction<Long, Long>() {
-				public Long map(Long value) {
-					return 1L;
-				}
-			})
-			.reduce(new ReduceFunction<Long>() {
-				public Long reduce(Long value1, Long value2) {
-					return value1 + value2;
-				}
-			});
-		
-		// enumerate some sample edges and assign an initial uniform probability (rank)
-		DataSet<Tuple2<Long, Double>> intialRanks = vertices
-			.map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
-				
-				private long numVertices;
-				
-				@Override
-				public void open(Configuration parameters) {
-					numVertices = getRuntimeContext().<Long>getBroadcastVariable("count").iterator().next();
-				}
-				
-				public Tuple2<Long, Double> map(Long value) {
-					return new Tuple2<Long, Double>(value, 1.0/numVertices);
-				}
-			}).withBroadcastSet(count, "count");
-		
-
-		VertexCentricIteration<Long, Double, Double, Double> iteration = VertexCentricIteration.withValuedEdges(edgesWithProbability,
-				new VertexRankUpdater(BETA), new RankMessenger(), 20);
-		iteration.addBroadcastSetForUpdateFunction("count", count);
-		
-		
-		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(iteration);
-		
-		result.print();
-		env.execute("Spargel PageRank");
-	}
-	
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
-	 * and then applying the dampening formula.
-	 */
-	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
-		
-		private final double beta;
-		private long numVertices;
-		
-		public VertexRankUpdater(double beta) {
-			this.beta = beta;
-		}
-		
-		@Override
-		public void preSuperstep() {
-			numVertices = this.<Long>getBroadcastSet("count").iterator().next();
-		}
-
-		@Override
-		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-			
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
-			setNewVertexValue(newRank);
-		}
-	}
-	
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
-	 * which is associated with an edge as the edge value.
-	 */
-	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
-		
-		@Override
-		public void sendMessages(Long vertexId, Double newRank) {
-			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.target(), newRank * edge.edgeValue());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
deleted file mode 100644
index 0c39688..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Value;
-
-
-public final class Edge<VertexKey extends Key<VertexKey>, EdgeValue extends Value> {
-	
-	private VertexKey target;
-	private EdgeValue edgeValue;
-	
-	void set(VertexKey target, EdgeValue edgeValue) {
-		this.target = target;
-		this.edgeValue = edgeValue;
-	}
-	
-	public VertexKey target() {
-		return target;
-	}
-	
-	public EdgeValue edgeValue() {
-		return edgeValue;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
deleted file mode 100644
index 03022d2..0000000
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.util.Iterator;
-
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-
-public final class MessageIterator<Message extends Value> implements Iterator<Message>, Iterable<Message> {
-
-	private final Message instance;
-	private Iterator<Record> source;
-	
-	public MessageIterator(Message instance) {
-		this.instance = instance;
-	}
-	
-	public final void setSource(Iterator<Record> source) {
-		this.source = source;
-	}
-	
-	@Override
-	public final boolean hasNext() {
-		return this.source.hasNext();
-	}
-	
-	@Override
-	public final Message next() {
-		this.source.next().getFieldInto(1, this.instance);
-		return this.instance;
-	}
-
-	@Override
-	public final void remove() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Iterator<Message> iterator() {
-		return this;
-	}
-}


Mime
View raw message