flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [60/92] [abbrv] prefix all projects in addons and quickstarts with flink-
Date Tue, 22 Jul 2014 10:41:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
deleted file mode 100644
index c072754..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/VertexUpdateFunction.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This class must be extended by functions that compute the state of the vertex depending on the old state and the
- * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
- * invoked once per vertex per superstep.
- * 
- * <VertexKey> The vertex key type.
- * <VertexValue> The vertex value type.
- * <Message> The message type.
- */
-public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
-	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
-	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
-	 * 
-	 * @param vertexKey The key (identifier) of the vertex.
-	 * @param vertexValue The value (state) of the vertex.
-	 * @param inMessages The incoming messages to this vertex.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() throws Exception {}
-	
-	/**
-	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
-	 * 
-	 * @param newValue The new vertex value.
-	 */
-	public void setNewVertexValue(VertexValue newValue) {
-		outVal.f1 = newValue;
-		out.collect(outVal);
-	}
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link VertexCentricIteration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  internal methods
-	// --------------------------------------------------------------------------------------------
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Collector<Tuple2<VertexKey, VertexValue>> out;
-	
-	private Tuple2<VertexKey, VertexValue> outVal;
-	
-	
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-	}
-	
-	void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey, VertexValue>> out) {
-		this.out = out;
-		this.outVal = val;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
deleted file mode 100644
index ea90feb..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class SpargelConnectedComponents {
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<Long> vertexIds = env.generateSequence(0, 10);
-		DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(0L, 2L), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Long, Long>(4L, 8L),
-															new Tuple2<Long, Long>(1L, 5L), new Tuple2<Long, Long>(3L, 7L), new Tuple2<Long, Long>(3L, 9L));
-		
-		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-		
-		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
-		
-		result.print();
-		env.execute("Spargel Connected Components");
-	}
-	
-	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
-		@Override
-		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
-			long min = Long.MAX_VALUE;
-			for (long msg : inMessages) {
-				min = Math.min(min, msg);
-			}
-			if (min < vertexValue) {
-				setNewVertexValue(min);
-			}
-		}
-	}
-	
-	public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> {
-		@Override
-		public void sendMessages(Long vertexId, Long componentId) {
-			sendMessageToAllNeighbors(componentId);
-		}
-	}
-	
-	/**
-	 * A map function that takes a Long value and creates a 2-tuple out of it:
-	 * <pre>(Long value) -> (value, value)</pre>
-	 */
-	public static final class IdAssigner extends MapFunction<Long, Tuple2<Long, Long>> {
-		@Override
-		public Tuple2<Long, Long> map(Long value) {
-			return new Tuple2<Long, Long>(value, value);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
deleted file mode 100644
index c7fbaaa..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
- * In this implementation, the edges carry a weight (the transition probability).
- */
-@SuppressWarnings("serial")
-public class SpargelPageRank {
-	
-	private static final double BETA = 0.85;
-
-	
-	public static void main(String[] args) throws Exception {
-		final int numVertices = 100;
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// enumerate some sample edges and assign an initial uniform probability (rank)
-		DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
-								.map(new MapFunction<Long, Tuple2<Long, Double>>() {
-									public Tuple2<Long, Double> map(Long value) {
-										return new Tuple2<Long, Double>(value, 1.0/numVertices);
-									}
-								});
-		
-		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
-		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
-								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
-									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
-										int numOutEdges = (int) (Math.random() * (numVertices / 2));
-										for (int i = 0; i < numOutEdges; i++) {
-											long target = (long) (Math.random() * numVertices) + 1;
-											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
-										}
-									}
-								});
-		
-		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(
-			VertexCentricIteration.withValuedEdges(edgesWithProbability,
-						new VertexRankUpdater(numVertices, BETA), new RankMessenger(), 20));
-		
-		result.print();
-		env.execute("Spargel PageRank");
-	}
-	
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
-	 * and then applying the dampening formula.
-	 */
-	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
-		
-		private final long numVertices;
-		private final double beta;
-		
-		public VertexRankUpdater(long numVertices, double beta) {
-			this.numVertices = numVertices;
-			this.beta = beta;
-		}
-
-		@Override
-		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-			
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
-			setNewVertexValue(newRank);
-		}
-	}
-	
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
-	 * which is associated with an edge as the edge value.
-	 */
-	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
-		
-		@Override
-		public void sendMessages(Long vertexId, Double newRank) {
-			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.target(), newRank * edge.edgeValue());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
deleted file mode 100644
index 34c9ad8..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.examples;
-
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.OutgoingEdge;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * An implementation of the basic PageRank algorithm in the vertex-centric API (spargel).
- * In this implementation, the edges carry a weight (the transition probability).
- */
-@SuppressWarnings("serial")
-public class SpargelPageRankCountingVertices {
-	
-	private static final double BETA = 0.85;
-
-	
-	public static void main(String[] args) throws Exception {
-		final int NUM_VERTICES = 100;
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// a list of vertices
-		DataSet<Long> vertices = env.generateSequence(1, NUM_VERTICES);
-		
-		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
-		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
-								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
-									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
-										int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
-										for (int i = 0; i < numOutEdges; i++) {
-											long target = (long) (Math.random() * NUM_VERTICES) + 1;
-											out.collect(new Tuple3<Long, Long, Double>(value, target, 1.0/numOutEdges));
-										}
-									}
-								});
-		
-		// ---------- start of the algorithm ---------------
-		
-		// count the number of vertices
-		DataSet<Long> count = vertices
-			.map(new MapFunction<Long, Long>() {
-				public Long map(Long value) {
-					return 1L;
-				}
-			})
-			.reduce(new ReduceFunction<Long>() {
-				public Long reduce(Long value1, Long value2) {
-					return value1 + value2;
-				}
-			});
-		
-		// enumerate some sample edges and assign an initial uniform probability (rank)
-		DataSet<Tuple2<Long, Double>> intialRanks = vertices
-			.map(new MapFunction<Long, Tuple2<Long, Double>>() {
-				
-				private long numVertices;
-				
-				@Override
-				public void open(Configuration parameters) {
-					numVertices = getRuntimeContext().<Long>getBroadcastVariable("count").iterator().next();
-				}
-				
-				public Tuple2<Long, Double> map(Long value) {
-					return new Tuple2<Long, Double>(value, 1.0/numVertices);
-				}
-			}).withBroadcastSet(count, "count");
-		
-
-		VertexCentricIteration<Long, Double, Double, Double> iteration = VertexCentricIteration.withValuedEdges(edgesWithProbability,
-				new VertexRankUpdater(BETA), new RankMessenger(), 20);
-		iteration.addBroadcastSetForUpdateFunction("count", count);
-		
-		
-		DataSet<Tuple2<Long, Double>> result = intialRanks.runOperation(iteration);
-		
-		result.print();
-		env.execute("Spargel PageRank");
-	}
-	
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
-	 * and then applying the dampening formula.
-	 */
-	public static final class VertexRankUpdater extends VertexUpdateFunction<Long, Double, Double> {
-		
-		private final double beta;
-		private long numVertices;
-		
-		public VertexRankUpdater(double beta) {
-			this.beta = beta;
-		}
-		
-		@Override
-		public void preSuperstep() {
-			numVertices = this.<Long>getBroadcastSet("count").iterator().next();
-		}
-
-		@Override
-		public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-			
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1-BETA)/numVertices;
-			setNewVertexValue(newRank);
-		}
-	}
-	
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to the transition probability,
-	 * which is associated with an edge as the edge value.
-	 */
-	public static final class RankMessenger extends MessagingFunction<Long, Double, Double, Double> {
-		
-		@Override
-		public void sendMessages(Long vertexId, Double newRank) {
-			for (OutgoingEdge<Long, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.target(), newRank * edge.edgeValue());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
deleted file mode 100644
index ab29471..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/Edge.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Value;
-
-
-public final class Edge<VertexKey extends Key<VertexKey>, EdgeValue extends Value> {
-	
-	private VertexKey target;
-	private EdgeValue edgeValue;
-	
-	void set(VertexKey target, EdgeValue edgeValue) {
-		this.target = target;
-		this.edgeValue = edgeValue;
-	}
-	
-	public VertexKey target() {
-		return target;
-	}
-	
-	public EdgeValue edgeValue() {
-		return edgeValue;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
deleted file mode 100644
index 25ad748..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/MessageIterator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.util.Iterator;
-
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-
-public final class MessageIterator<Message extends Value> implements Iterator<Message>, Iterable<Message> {
-
-	private final Message instance;
-	private Iterator<Record> source;
-	
-	public MessageIterator(Message instance) {
-		this.instance = instance;
-	}
-	
-	public final void setSource(Iterator<Record> source) {
-		this.source = source;
-	}
-	
-	@Override
-	public final boolean hasNext() {
-		return this.source.hasNext();
-	}
-	
-	@Override
-	public final Message next() {
-		this.source.next().getFieldInto(1, this.instance);
-		return this.instance;
-	}
-
-	@Override
-	public final void remove() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Iterator<Message> iterator() {
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
deleted file mode 100644
index 026b366..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/MessagingFunction.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-public abstract class MessagingFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> implements Serializable {
-
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
-	
-	public void setup(Configuration config) throws Exception {}
-	
-	public void preSuperstep() throws Exception {}
-	
-	public void postSuperstep() throws Exception {}
-	
-	
-	public Iterator<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
-		}
-		
-		edgesUsed = true;
-		edgeIter.set(edges);
-		return edgeIter;
-	}
-	
-	public void sendMessageToAllNeighbors(Message m) {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'.");
-		}
-		
-		edgesUsed = true;
-		while (edges.hasNext()) {
-			Record next = edges.next();
-			VertexKey k = next.getField(1, this.keyClass);
-			outValue.setField(0, k);
-			outValue.setField(1, m);
-			out.collect(outValue);
-		}
-	}
-	
-	public void sendMessageTo(VertexKey target, Message m) {
-		outValue.setField(0, target);
-		outValue.setField(1, m);
-		out.collect(outValue);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public int getSuperstep() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  internal methods and state
-	// --------------------------------------------------------------------------------------------
-	
-	private Record outValue;
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Iterator<Record> edges;
-	
-	private Collector<Record> out;
-	
-	private EdgesIterator<VertexKey, EdgeValue> edgeIter;
-	
-	private Class<VertexKey> keyClass;
-	
-	private boolean edgesUsed;
-	
-	
-	@SuppressWarnings("unchecked")
-	void init(IterationRuntimeContext context, VertexKey keyHolder, EdgeValue edgeValueHolder) {
-		this.runtimeContext = context;
-		this.edgeIter = new EdgesIterator<VertexKey, EdgeValue>(keyHolder, edgeValueHolder);
-		this.outValue = new Record();
-		this.keyClass = (Class<VertexKey>) keyHolder.getClass();
-	}
-	
-	void set(Iterator<Record> edges, Collector<Record> out) {
-		this.edges = edges;
-		this.out = out;
-		this.edgesUsed = false;
-	}
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final class EdgesIterator<VertexKey extends Key<VertexKey>, EdgeValue extends Value> implements Iterator<Edge<VertexKey, EdgeValue>> {
-
-		private Iterator<Record> input;
-		private VertexKey keyHolder;
-		private EdgeValue edgeValueHolder;
-		
-		private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
-		
-		EdgesIterator(VertexKey keyHolder, EdgeValue edgeValueHolder) {
-			this.keyHolder = keyHolder;
-			this.edgeValueHolder = edgeValueHolder;
-		}
-		
-		void set(Iterator<Record> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public Edge<VertexKey, EdgeValue> next() {
-			Record next = input.next();
-			next.getFieldInto(0, keyHolder);
-			next.getFieldInto(1, edgeValueHolder);
-			edge.set(keyHolder, edgeValueHolder);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
deleted file mode 100644
index 3a58afc..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.AggregatorRegistry;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.ReflectionUtil;
-
-public class SpargelIteration {
-	
-	private static final String DEFAULT_NAME = "<unnamed vertex-centric iteration>";
-	
-	private final DeltaIteration iteration;
-	
-	private final Class<? extends Key<?>> vertexKey;
-	private final Class<? extends Value> vertexValue;
-	private final Class<? extends Value> messageType;
-	private final Class<? extends Value> edgeValue;
-	
-	private final CoGroupOperator vertexUpdater;
-	private final CoGroupOperator messager;
-	
-	
-	// ----------------------------------------------------------------------------------
-	
-	public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value>
-			SpargelIteration(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			VertexUpdateFunction<VertexKey, VertexValue, Message> uf)
-	{
-		this(mf, uf, DEFAULT_NAME);
-	}
-	
-	public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> SpargelIteration(
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			String name)
-	{
-		// get the types
-		this.vertexKey = ReflectionUtil.getTemplateType1(mf.getClass());
-		this.vertexValue = ReflectionUtil.getTemplateType2(mf.getClass());
-		this.messageType = ReflectionUtil.getTemplateType3(mf.getClass());
-		this.edgeValue = ReflectionUtil.getTemplateType4(mf.getClass());
-		
-		if (vertexKey == null || vertexValue == null || messageType == null || edgeValue == null) {
-			throw new RuntimeException();
-		}
-	
-		// instantiate the data flow
-		this.iteration = new DeltaIteration(0, name);
-		
-		this.messager = CoGroupOperator.builder(MessagingDriver.class, vertexKey, 0, 0)
-			.input2(iteration.getWorkset())
-			.name("Message Sender")
-			.build();
-		this.vertexUpdater = CoGroupOperator.builder(VertexUpdateDriver.class, vertexKey, 0, 0)
-			.input1(messager)
-			.input2(iteration.getSolutionSet())
-			.name("Vertex Updater")
-			.build();
-		
-		iteration.setNextWorkset(vertexUpdater);
-		iteration.setSolutionSetDelta(vertexUpdater);
-		
-		// parameterize the data flow
-		try {
-			Configuration vertexUdfParams = vertexUpdater.getParameters();
-			InstantiationUtil.writeObjectToConfig(uf, vertexUdfParams, VertexUpdateDriver.UDF_PARAM);
-			vertexUdfParams.setClass(VertexUpdateDriver.KEY_PARAM, vertexKey);
-			vertexUdfParams.setClass(VertexUpdateDriver.VALUE_PARAM, vertexValue);
-			vertexUdfParams.setClass(VertexUpdateDriver.MESSAGE_PARAM, messageType);
-			
-			Configuration messageUdfParams = messager.getParameters();
-			InstantiationUtil.writeObjectToConfig(mf, messageUdfParams, MessagingDriver.UDF_PARAM);
-			messageUdfParams.setClass(MessagingDriver.KEY_PARAM, vertexKey);
-			messageUdfParams.setClass(MessagingDriver.VALUE_PARAM, vertexValue);
-			messageUdfParams.setClass(MessagingDriver.MESSAGE_PARAM, messageType);
-			messageUdfParams.setClass(MessagingDriver.EDGE_PARAM, edgeValue);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Could not serialize the UDFs for distribution" + 
-					(e.getMessage() == null ? '.' : ": " + e.getMessage()), e);
-		}
-	}
-	
-	// ----------------------------------------------------------------------------------
-	//  inputs and outputs
-	// ----------------------------------------------------------------------------------
-	
-	public void setVertexInput(Operator<Record> c) {
-		this.iteration.setInitialSolutionSet(c);
-		this.iteration.setInitialWorkset(c);
-	}
-	
-	public void setEdgesInput(Operator<Record> c) {
-		this.messager.setFirstInput(c);
-	}
-	
-	public Operator<?> getOutput() {
-		return this.iteration;
-	}
-	
-	public void setDegreeOfParallelism(int dop) {
-		this.iteration.setDegreeOfParallelism(dop);
-	}
-	
-	public void setNumberOfIterations(int iterations) {
-		this.iteration.setMaximumNumberOfIterations(iterations);
-	}
-	
-	public AggregatorRegistry getAggregators() {
-		return this.iteration.getAggregators();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Wrapping UDFs
-	// --------------------------------------------------------------------------------------------
-	
-	@ConstantFieldsFirst(0)
-	public static final class VertexUpdateDriver<K extends Key<K>, V extends Value, M extends Value> extends CoGroupFunction {
-		
-		private static final long serialVersionUID = 1L;
-		
-		private static final String UDF_PARAM = "spargel.udf";
-		private static final String KEY_PARAM = "spargel.key-type";
-		private static final String VALUE_PARAM = "spargel.value-type";
-		private static final String MESSAGE_PARAM = "spargel.message-type";
-		
-		private VertexUpdateFunction<K, V, M> vertexUpdateFunction;
-		
-		private K vertexKey;
-		private V vertexValue;
-		private MessageIterator<M> messageIter;
-
-		@Override
-		public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
-
-			if (vertex.hasNext()) {
-				Record first = vertex.next();
-				first.getFieldInto(0, vertexKey);
-				first.getFieldInto(1, vertexValue);
-				messageIter.setSource(messages);
-				vertexUpdateFunction.setOutput(first, out);
-				vertexUpdateFunction.updateVertex(vertexKey, vertexValue, messageIter);
-			} else {
-				if (messages.hasNext()) {
-					String message = "Target vertex does not exist!.";
-					try {
-						Record next = messages.next();
-						next.getFieldInto(0, vertexKey);
-						message = "Target vertex '" + vertexKey + "' does not exist!.";
-					} catch (Throwable t) {}
-					throw new Exception(message);
-				} else {
-					throw new Exception();
-				}
-			}
-		}
-		
-		@SuppressWarnings("unchecked")
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// instantiate only the first time
-			if (vertexUpdateFunction == null) {
-				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
-				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
-				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
-				
-				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
-				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
-				messageIter = new MessageIterator<M>(InstantiationUtil.instantiate(messageClass, Value.class));
-				
-				try {
-					this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
-				} catch (Exception e) {
-					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
-					throw new Exception("Could not instantiate VertexUpdateFunction" + message, e);
-				}
-				
-				this.vertexUpdateFunction.init(getIterationRuntimeContext());
-				this.vertexUpdateFunction.setup(parameters);
-			}
-			this.vertexUpdateFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.vertexUpdateFunction.postSuperstep();
-		}
-	}
-	
-	public static final class MessagingDriver<K extends Key<K>, V extends Value, M extends Value, E extends Value> extends CoGroupFunction {
-
-		private static final long serialVersionUID = 1L;
-		
-		private static final String UDF_PARAM = "spargel.udf";
-		private static final String KEY_PARAM = "spargel.key-type";
-		private static final String VALUE_PARAM = "spargel.value-type";
-		private static final String MESSAGE_PARAM = "spargel.message-type";
-		private static final String EDGE_PARAM = "spargel.edge-value";
-		
-		
-		private MessagingFunction<K, V, M, E> messagingFunction;
-		
-		private K vertexKey;
-		private V vertexValue;
-		
-		@Override
-		public void coGroup(Iterator<Record> edges, Iterator<Record> state, Collector<Record> out) throws Exception {
-			if (state.hasNext()) {
-				Record first = state.next();
-				first.getFieldInto(0, vertexKey);
-				first.getFieldInto(1, vertexValue);
-				messagingFunction.set(edges, out);
-				messagingFunction.sendMessages(vertexKey, vertexValue);
-			}
-		}
-		
-		@SuppressWarnings("unchecked")
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// instantiate only the first time
-			if (messagingFunction == null) {
-				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
-				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
-//				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
-				Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, Value.class);
-				
-				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
-				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
-				
-				K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
-				E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class);
-				
-				try {
-					this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
-				} catch (Exception e) {
-					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
-					throw new Exception("Could not instantiate MessagingFunction" + message, e);
-				}
-				
-				this.messagingFunction.init(getIterationRuntimeContext(), edgeKeyHolder, edgeValueHolder);
-				this.messagingFunction.setup(parameters);
-			}
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java b/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
deleted file mode 100644
index 37e32cd..0000000
--- a/flink-addons/spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.spargel.java.record;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <VertexKey> The vertex key type.
- * <VertexValue> The vertex value type.
- * <Message> The message type.
- */
-public abstract class VertexUpdateFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value> implements Serializable {
-
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-	
-	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
-	
-	public void setup(Configuration config) throws Exception {}
-	
-	public void preSuperstep() throws Exception {}
-	
-	public void postSuperstep() throws Exception {}
-	
-	public void setNewVertexValue(VertexValue newValue) {
-		outVal.setField(1, newValue);
-		out.collect(outVal);
-	}
-	
-	public int getSuperstep() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  internal methods
-	// --------------------------------------------------------------------------------------------
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Collector<Record> out;
-	
-	private Record outVal;
-	
-	
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-	}
-	
-	void setOutput(Record val, Collector<Record> out) {
-		this.out = out;
-		this.outVal = val;
-	}
-	
-	// serializability
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-addons/spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
deleted file mode 100644
index 678b5e1..0000000
--- a/flink-addons/spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.spargel.java;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.compiler.plan.DualInputPlanNode;
-import org.apache.flink.compiler.plan.OptimizedPlan;
-import org.apache.flink.compiler.plan.PlanNode;
-import org.apache.flink.compiler.plan.SinkPlanNode;
-import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
-import org.apache.flink.test.compiler.util.CompilerTestBase;
-
-
-public class SpargelCompilerTest extends CompilerTestBase {
-
-	@Test
-	public void testSpargelCompiler() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-				DataSet<Long> vertexIds = env.generateSequence(1, 2);
-				
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-				
-				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
-				
-				result.print();
-			}
-			
-			Plan p = env.createProgramPlan("Spargel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-			
-			// check that the initial workset sort is outside the loop
-			assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
-			assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSpargelCompilerWithBroadcastVariable() {
-		try {
-			final String BC_VAR_NAME = "borat variable";
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-				DataSet<Long> bcVar = env.fromElements(1L);
-				
-				DataSet<Long> vertexIds = env.generateSequence(1, 2);
-				
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-				
-				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-				
-				VertexCentricIteration<Long, Long, Long, ?> vcIter = VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100);
-				vcIter.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
-				vcIter.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
-				
-				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
-				
-				result.print();
-			}
-			
-			Plan p = env.createProgramPlan("Spargel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-addons/spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
deleted file mode 100644
index e862e7c..0000000
--- a/flink-addons/spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.spargel.java;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.DeltaIteration;
-import org.apache.flink.api.java.DeltaIterationResultSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.TwoInputUdfOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.spargel.java.MessageIterator;
-import org.apache.flink.spargel.java.MessagingFunction;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.VertexUpdateFunction;
-
-@SuppressWarnings("serial")
-public class SpargelTranslationTest {
-
-	@Test
-	public void testTranslationPlainEdges() {
-		try {
-			final String ITERATION_NAME = "Test Name";
-			
-			final String AGGREGATOR_NAME = "AggregatorName";
-			
-			final String BC_SET_MESSAGES_NAME = "borat messages";
-			
-			final String BC_SET_UPDATES_NAME = "borat updates";
-			;
-			final int NUM_ITERATIONS = 13;
-			
-			final int ITERATION_DOP = 77;
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> bcMessaging = env.fromElements(1L);
-			DataSet<Long> bcUpdate = env.fromElements(1L);
-			
-			DataSet<Tuple2<String, Double>> result;
-			
-			// ------------ construct the test program ------------------
-			{
-				
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
-	
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
-				
-				
-				VertexCentricIteration<String, Double, Long, ?> vertexIteration = 
-						VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
-				vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
-				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
-				
-				vertexIteration.setName(ITERATION_NAME);
-				vertexIteration.setParallelism(ITERATION_DOP);
-				
-				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-				
-				result = initialVertices.runOperation(vertexIteration);
-			}
-			
-			
-			// ------------- validate the java program ----------------
-			
-			assertTrue(result instanceof DeltaIterationResultSet);
-			
-			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
-			
-			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
-			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_DOP, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-			
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-			
-			// validate that the semantic properties are set as they should
-			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSematicProperties().getForwardedField1(0).contains(0));
-			assertTrue(solutionSetJoin.getSematicProperties().getForwardedField2(0).contains(0));
-			
-			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
-			
-			// validate that the broadcast sets are forwarded
-			assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
-			assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
-		try {
-			final String ITERATION_NAME = "Test Name";
-			
-			final String AGGREGATOR_NAME = "AggregatorName";
-			
-			final String BC_SET_MESSAGES_NAME = "borat messages";
-			
-			final String BC_SET_UPDATES_NAME = "borat updates";
-			;
-			final int NUM_ITERATIONS = 13;
-			
-			final int ITERATION_DOP = 77;
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> bcVar = env.fromElements(1L);
-			
-			DataSet<Tuple2<String, Double>> result;
-			
-			// ------------ construct the test program ------------------
-			{
-				
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
-	
-				@SuppressWarnings("unchecked")
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
-				
-				
-				VertexCentricIteration<String, Double, Long, ?> vertexIteration = 
-						VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
-				vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
-				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
-				
-				vertexIteration.setName(ITERATION_NAME);
-				vertexIteration.setParallelism(ITERATION_DOP);
-				
-				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-				
-				result = initialVertices.runOperation(vertexIteration);
-			}
-			
-			
-			// ------------- validate the java program ----------------
-			
-			assertTrue(result instanceof DeltaIterationResultSet);
-			
-			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
-			
-			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
-			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_DOP, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-			
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-			
-			// validate that the semantic properties are set as they should
-			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSematicProperties().getForwardedField1(0).contains(0));
-			assertTrue(solutionSetJoin.getSematicProperties().getForwardedField2(0).contains(0));
-			
-			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
-			
-			// validate that the broadcast sets are forwarded
-			assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
-			assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
-
-		@Override
-		public void updateVertex(String vertexKey, Double vertexValue, MessageIterator<Long> inMessages) {}
-	}
-	
-	public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, Object> {
-
-		@Override
-		public void sendMessages(String vertexKey, Double vertexValue) {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-addons/spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
deleted file mode 100644
index a34f2db..0000000
--- a/flink-addons/spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.spargel;
-
-import java.io.BufferedReader;
-
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.spargel.java.VertexCentricIteration;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
-import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class SpargelConnectedComponentsITCase extends JavaProgramTestBase {
-
-	private static final long SEED = 9487520347802987L;
-	
-	private static final int NUM_VERTICES = 1000;
-	
-	private static final int NUM_EDGES = 10000;
-
-	private String resultPath;
-	
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempFilePath("results");
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
-		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
-		
-		DataSet<Tuple2<Long, Long>> edges = edgeString.map(new EdgeParser());
-		
-		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
-		
-		result.writeAsCsv(resultPath, "\n", " ");
-		env.execute("Spargel Connected Components");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		for (BufferedReader reader : getResultReader(resultPath)) {
-			ConnectedComponentsData.checkOddEvenResult(reader);
-		}
-	}
-	
-	public static final class EdgeParser extends MapFunction<String, Tuple2<Long, Long>> {
-		public Tuple2<Long, Long> map(String value) {
-			String[] nums = value.split(" ");
-			return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1]));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/yarn/pom.xml b/flink-addons/yarn/pom.xml
deleted file mode 100644
index b475b26..0000000
--- a/flink-addons/yarn/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-addons</artifactId>
-		<version>0.6-incubating-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	
-	<artifactId>yarn</artifactId>
-	<name>yarn</name>
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime</artifactId>
-			<version>${project.version}</version>
-			<exclusions>
-				<exclusion>
-					<artifactId>hadoop-core</artifactId>
-					<groupId>org.apache.hadoop</groupId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-yarn-client</artifactId>
-			<version>${hadoop.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-			<version>${hadoop.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-			<version>${hadoop.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-mapreduce-client-core</artifactId>
-			<version>${hadoop.version}</version>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java b/flink-addons/yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
deleted file mode 100644
index 40635dc..0000000
--- a/flink-addons/yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Writer;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.util.Records;
-
-import com.google.common.base.Preconditions;
-
-public class ApplicationMaster {
-
-	private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
-	
-	private void run() throws Exception  {
-		//Utils.logFilesInCurrentDirectory(LOG);
-		// Initialize clients to ResourceManager and NodeManagers
-		Configuration conf = Utils.initializeYarnConfiguration();
-		FileSystem fs = FileSystem.get(conf);
-		Map<String, String> envs = System.getenv();
-		final String currDir = envs.get(Environment.PWD.key());
-		final String logDirs =  envs.get(Environment.LOG_DIRS.key());
-		final String ownHostname = envs.get(Environment.NM_HOST.key());
-		final String appId = envs.get(Client.ENV_APP_ID);
-		final String clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
-		final String applicationMasterHost = envs.get(Environment.NM_HOST.key());
-		final String remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
-		final String shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
-		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		final int taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
-		final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
-		final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
-		
-		int heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
-		
-		if(currDir == null) {
-			throw new RuntimeException("Current directory unknown");
-		}
-		if(ownHostname == null) {
-			throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
-		}
-		LOG.info("Working directory "+currDir);
-		
-		// load Flink configuration.
-		Utils.getFlinkConfiguration(currDir);
-		
-		final String localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
-		
-		// Update yaml conf -> set jobManager address to this machine's address.
-		FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
-		BufferedReader br = new BufferedReader(new InputStreamReader(fis));
-		Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
-		String line ;
-		while ( (line = br.readLine()) != null) {
-			if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
-				output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
-			} else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
-				output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
-			} else {
-				output.append(line+"\n");
-			}
-		}
-		// just to make sure.
-		output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
-		output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
-		output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
-		output.close();
-		br.close();
-		File newConf = new File(currDir+"/flink-conf-modified.yaml");
-		if(!newConf.exists()) {
-			LOG.warn("modified yaml does not exist!");
-		}
-		
-		Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME, 
-				ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
-		
-		JobManager jm;
-		{
-			String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
-			String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
-			
-			// start the job manager
-			jm = JobManager.initialize( args );
-			
-			// Start info server for jobmanager
-			jm.startInfoServer();
-		}
-		
-		AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
-		rmClient.init(conf);
-		rmClient.start();
-
-		NMClient nmClient = NMClient.createNMClient();
-		nmClient.init(conf);
-		nmClient.start();
-
-		// Register with ResourceManager
-		LOG.info("registering ApplicationMaster");
-		rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
-
-		// Priority for worker containers - priorities are intra-application
-		Priority priority = Records.newRecord(Priority.class);
-		priority.setPriority(0);
-
-		// Resource requirements for worker containers
-		Resource capability = Records.newRecord(Resource.class);
-		capability.setMemory(memoryPerTaskManager);
-		capability.setVirtualCores(coresPerTaskManager);
-
-		// Make container requests to ResourceManager
-		for (int i = 0; i < taskManagerCount; ++i) {
-			ContainerRequest containerAsk = new ContainerRequest(capability,
-					null, null, priority);
-			LOG.info("Requesting TaskManager container " + i);
-			rmClient.addContainerRequest(containerAsk);
-		}
-		
-		LocalResource flinkJar = Records.newRecord(LocalResource.class);
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-
-		// register Flink Jar with remote HDFS
-		final Path remoteJarPath = new Path(remoteFlinkJarPath);
-		Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
-		
-		// register conf with local fs.
-		Path remoteConfPath = Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
-		LOG.info("Prepared localresource for modified yaml: "+flinkConf);
-		
-		
-		boolean hasLog4j = new File(currDir+"/log4j.properties").exists();
-		// prepare the files to ship
-		LocalResource[] remoteShipRsc = null;
-		String[] remoteShipPaths = shipListString.split(",");
-		if(!shipListString.isEmpty()) {
-			remoteShipRsc = new LocalResource[remoteShipPaths.length]; 
-			{ // scope for i
-				int i = 0;
-				for(String remoteShipPathStr : remoteShipPaths) {
-					if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
-						continue;
-					}
-					remoteShipRsc[i] = Records.newRecord(LocalResource.class);
-					Path remoteShipPath = new Path(remoteShipPathStr);
-					Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
-					i++;
-				}
-			}
-		}
-		
-		// respect custom JVM options in the YAML file
-		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-				
-		// Obtain allocated containers and launch
-		int allocatedContainers = 0;
-		int completedContainers = 0;
-		while (allocatedContainers < taskManagerCount) {
-			AllocateResponse response = rmClient.allocate(0);
-			for (Container container : response.getAllocatedContainers()) {
-				LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
-				++allocatedContainers;
-
-				// Launch container by create ContainerLaunchContext
-				ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-				
-				String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
-				if(hasLog4j) {
-					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
-				}
-				tmCommand	+= " org.apache.flink.yarn.YarnTaskManagerRunner -configDir . "
-						+ " 1>"
-						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
-						+ "/taskmanager-stdout.log" 
-						+ " 2>"
-						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
-						+ "/taskmanager-stderr.log";
-				ctx.setCommands(Collections.singletonList(tmCommand));
-				
-				LOG.info("Starting TM with command="+tmCommand);
-				
-				// copy resources to the TaskManagers.
-				Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
-				localResources.put("flink.jar", flinkJar);
-				localResources.put("flink-conf.yaml", flinkConf);
-				
-				// add ship resources
-				if(!shipListString.isEmpty()) {
-					Preconditions.checkNotNull(remoteShipRsc);
-					for( int i = 0; i < remoteShipPaths.length; i++) {
-						localResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
-					}
-				}
-				
-				
-				ctx.setLocalResources(localResources);
-				
-				// Setup CLASSPATH for Container (=TaskTracker)
-				Map<String, String> containerEnv = new HashMap<String, String>();
-				Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
-				containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
-				
-				ctx.setEnvironment(containerEnv);
-
-				UserGroupInformation user = UserGroupInformation.getCurrentUser();
-				try {
-					Credentials credentials = user.getCredentials();
-					DataOutputBuffer dob = new DataOutputBuffer();
-					credentials.writeTokenStorageToStream(dob);
-					ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
-							0, dob.getLength());
-					ctx.setTokens(securityTokens);
-				} catch (IOException e) {
-					LOG.warn("Getting current user info failed when trying to launch the container"
-							+ e.getMessage());
-				}
-				
-				LOG.info("Launching container " + allocatedContainers);
-				nmClient.startContainer(container, ctx);
-			}
-			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
-				++completedContainers;
-				LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
-				LOG.info("Diagnostics "+status.getDiagnostics());
-			}
-			Thread.sleep(100);
-		}
-
-		// Now wait for containers to complete
-		
-		while (completedContainers < taskManagerCount) {
-			AllocateResponse response = rmClient.allocate(completedContainers
-					/ taskManagerCount);
-			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
-				++completedContainers;
-				LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
-				LOG.info("Diagnostics "+status.getDiagnostics());
-			}
-			Thread.sleep(5000);
-		}
-		LOG.info("Shutting down JobManager");
-		jm.shutdown();
-		
-		// Un-register with ResourceManager
-		rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
-		
-		
-	}
-	public static void main(String[] args) throws Exception {
-		final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
-		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
-				+ " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
-		}
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					new ApplicationMaster().run();
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
-				return null;
-			}
-		});
-	}
-}


Mime
View raw message