flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [25/50] [abbrv] flink git commit: [FLINK-1201] [gelly] changed spargel classes to work with Vertex and Edge types
Date Wed, 11 Feb 2015 10:49:27 GMT
[FLINK-1201] [gelly] changed spargel classes to work with Vertex and Edge types


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45c0491
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45c0491
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45c0491

Branch: refs/heads/master
Commit: d45c049167ae30ae42fc63a46d9da29df2369652
Parents: b0b1295
Author: vasia <vasilikikalavri@gmail.com>
Authored: Thu Jan 8 20:29:14 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java |  10 +-
 .../apache/flink/graph/library/PageRank.java    |   7 +-
 .../library/SingleSourceShortestPaths.java      |   5 +-
 .../flink/graph/spargel/MessagingFunction.java  |  89 ++-----
 .../flink/graph/spargel/OutgoingEdge.java       |  64 -----
 .../graph/spargel/VertexCentricIteration.java   | 237 ++++---------------
 .../graph/spargel/VertexUpdateFunction.java     |  12 +-
 7 files changed, 83 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 247f5fc..425a377 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -53,7 +53,6 @@ import flink.graphs.spargel.MessagingFunction;
 import flink.graphs.spargel.VertexCentricIteration;
 import flink.graphs.spargel.VertexUpdateFunction;
 import flink.graphs.utils.GraphUtils;
-import flink.graphs.utils.Tuple2ToVertexMap;
 import flink.graphs.validation.GraphValidator;
 
 /**
@@ -1058,15 +1057,12 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
 	 * @return
 	 */
-	@SuppressWarnings("unchecked")
 	public <M>Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K,
VV, M> vertexUpdateFunction,
     		MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations)
{
-    	DataSet<Tuple2<K, VV>> tupleVertices = (DataSet<Tuple2<K, VV>>)
(DataSet<?>) vertices;
-    	DataSet<Tuple3<K, K, EV>> tupleEdges = (DataSet<Tuple3<K, K, EV>>)
(DataSet<?>) edges;
-    	DataSet<Tuple2<K, VV>> newVertices = tupleVertices.runOperation(
-    	                     VertexCentricIteration.withValuedEdges(tupleEdges,
+    	DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(
+    	                     VertexCentricIteration.withEdges(edges,
 						vertexUpdateFunction, messagingFunction, maximumNumberOfIterations));
-		return new Graph<K, VV, EV>(newVertices.map(new Tuple2ToVertexMap<K, VV>()),
edges, context);
+		return new Graph<K, VV, EV>(newVertices, edges, context);
     }
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index b7ca52b..d29a9dc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -1,11 +1,10 @@
 package flink.graphs.library;
 
-
+import flink.graphs.Edge;
 import flink.graphs.Graph;
 import flink.graphs.GraphAlgorithm;
 import flink.graphs.spargel.MessageIterator;
 import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.OutgoingEdge;
 import flink.graphs.spargel.VertexUpdateFunction;
 
 import java.io.Serializable;
@@ -69,8 +68,8 @@ public class PageRank<K extends Comparable<K> & Serializable>
implements GraphAl
 
         @Override
         public void sendMessages(K vertexId, Double newRank) {
-            for (OutgoingEdge<K, Double> edge : getOutgoingEdges()) {
-                sendMessageTo(edge.target(), newRank * edge.edgeValue());
+            for (Edge<K, Double> edge : getOutgoingEdges()) {
+                sendMessageTo(edge.getTarget(), newRank * edge.getValue());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index c29909c..0da8a90 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -3,7 +3,6 @@ package flink.graphs.library;
 import flink.graphs.*;
 import flink.graphs.spargel.MessageIterator;
 import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.OutgoingEdge;
 import flink.graphs.spargel.VertexUpdateFunction;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -87,8 +86,8 @@ public class SingleSourceShortestPaths<K extends Comparable<K>
& Serializable> i
 
         @Override
         public void sendMessages(K vertexKey, Double newDistance) throws Exception {
-            for (OutgoingEdge<K, Double> edge : getOutgoingEdges()) {
-                sendMessageTo(edge.target(), newDistance + edge.edgeValue());
+            for (Edge<K, Double> edge : getOutgoingEdges()) {
+                sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index 52a881e..ab451bb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -26,10 +26,11 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
+import flink.graphs.Edge;
+
 /**
  * The base class for functions that produce messages between vertices as a part of a {@link
VertexCentricIteration}.
  * 
@@ -38,7 +39,8 @@ import org.apache.flink.util.Collector;
  * @param <Message> The type of the message sent between vertices along the edges.
  * @param <EdgeValue> The type of the values that are associated with the edges.
  */
-public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>,
VertexValue, Message, EdgeValue> implements Serializable {
+public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>
& Serializable, 
+	VertexValue extends Serializable, Message, EdgeValue extends Serializable> implements
Serializable {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -79,19 +81,13 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>,
 	 * @return An iterator with all outgoing edges.
 	 */
 	@SuppressWarnings("unchecked")
-	public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() {
+	public Iterable<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
 		if (edgesUsed) {
 			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()'
exactly once.");
 		}
 		edgesUsed = true;
-		
-		if (this.edgeWithValueIter != null) {
-			this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>)
edges);
-			return this.edgeWithValueIter;
-		} else {
-			this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
-			return this.edgeNoValueIter;
-		}
+		this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) edges);
+		return this.edgeIterator;
 	}
 	
 	/**
@@ -186,22 +182,15 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>,
 	
 	private Collector<Tuple2<VertexKey, Message>> out;
 	
-	private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter;
-	
-	private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;
+	private EdgesIterator<VertexKey, EdgeValue> edgeIterator;
 	
 	private boolean edgesUsed;
 	
 	
-	void init(IterationRuntimeContext context, boolean hasEdgeValue) {
+	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
 		this.outValue = new Tuple2<VertexKey, Message>();
-		
-		if (hasEdgeValue) {
-			this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
-		} else {
-			this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
-		}
+		this.edgeIterator = new EdgesIterator<VertexKey, EdgeValue>();
 	}
 	
 	void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out)
{
@@ -210,52 +199,15 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>,
 		this.edgesUsed = false;
 	}
 	
-	
-	
-	private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>,
EdgeValue> 
-		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey,
EdgeValue>>
-	{
-		private Iterator<Tuple2<VertexKey, VertexKey>> input;
-		
-		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey,
EdgeValue>();
-		
-		
-		void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public OutgoingEdge<VertexKey, EdgeValue> next() {
-			Tuple2<VertexKey, VertexKey> next = input.next();
-			edge.set(next.f1, null);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
-			return this;
-		}
-	}
-	
-	
-	private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>,
EdgeValue> 
-		implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey,
EdgeValue>>
+	private static final class EdgesIterator<VertexKey extends Comparable<VertexKey>
& Serializable, 
+		EdgeValue extends Serializable> 
+		implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey,
EdgeValue>>
 	{
-		private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input;
+		private Iterator<Edge<VertexKey, EdgeValue>> input;
 		
-		private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey,
EdgeValue>();
+		private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
 		
-		void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) {
+		void set(Iterator<Edge<VertexKey, EdgeValue>> input) {
 			this.input = input;
 		}
 		
@@ -265,9 +217,10 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>,
 		}
 
 		@Override
-		public OutgoingEdge<VertexKey, EdgeValue> next() {
-			Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next();
-			edge.set(next.f1, next.f2);
+		public Edge<VertexKey, EdgeValue> next() {
+			Edge<VertexKey, EdgeValue> next = input.next();
+			edge.setTarget(next.f1);
+			edge.setValue(next.f2);
 			return edge;
 		}
 
@@ -276,7 +229,7 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>,
 			throw new UnsupportedOperationException();
 		}
 		@Override
-		public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
+		public Iterator<Edge<VertexKey, EdgeValue>> iterator() {
 			return this;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java
deleted file mode 100644
index 7505409..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/OutgoingEdge.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package flink.graphs.spargel;
-
-/**
- * <tt>Edge</tt> objects represent edges between vertices. Edges are defined
by their source and target
- * vertex id. Edges may have an associated value (for example a weight or a distance), if
the
- * graph algorithm was initialized with the
- * {@link VertexCentricIteration#withValuedEdges(org.apache.flink.api.java.DataSet, VertexUpdateFunction,
MessagingFunction, int)}
- * method.
- *
- * @param <VertexKey> The type of the vertex key.
- * @param <EdgeValue> The type of the value associated with the edge. For scenarios
where the edges do not hold
- *                    value, this type may be arbitrary.
- */
-public final class OutgoingEdge<VertexKey extends Comparable<VertexKey>, EdgeValue>
implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private VertexKey target;
-	
-	private EdgeValue edgeValue;
-	
-	void set(VertexKey target, EdgeValue edgeValue) {
-		this.target = target;
-		this.edgeValue = edgeValue;
-	}
-	
-	/**
-	 * Gets the target vertex id.
-	 * 
-	 * @return The target vertex id.
-	 */
-	public VertexKey target() {
-		return target;
-	}
-	
-	/**
-	 * Gets the value associated with the edge. The value may be null if the iteration was initialized
with
-	 * an edge data set without edge values.
-	 * Typical examples of edge values are weights or distances of the path represented by the
edge.
-	 *  
-	 * @return The value associated with the edge.
-	 */
-	public EdgeValue edgeValue() {
-		return edgeValue;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index b15c8c4..5f89e90 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -18,6 +18,7 @@
 
 package flink.graphs.spargel;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -33,13 +34,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
+import flink.graphs.Edge;
+import flink.graphs.Vertex;
+
 /**
  * This class represents iterative graph computations, programmed in a vertex-centric perspective.
  * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm
has also been
@@ -70,16 +73,15 @@ import org.apache.flink.util.Collector;
  * @param <Message> The type of the message sent between vertices along the edges.
  * @param <EdgeValue> The type of the values that are associated with the edges.
  */
-public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue,
Message, EdgeValue> 
-	implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey,
VertexValue>>
+public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> &
Serializable, VertexValue extends Serializable, 
+	Message, EdgeValue extends Serializable> 
+	implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey,
VertexValue>>
 {
 	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
 	
 	private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
 	
-	private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
-	
-	private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
+	private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
 	
 	private final Map<String, Aggregator<?>> aggregators;
 	
@@ -91,7 +93,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 	
 	private final TypeInformation<Message> messageType;
 	
-	private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
+	private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
 	
 	private String name;
 	
@@ -101,64 +103,21 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 	
 	// ----------------------------------------------------------------------------------
 	
-	private  VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message>
uf,
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-			int maximumNumberOfIterations)
-	{
-		Validate.notNull(uf);
-		Validate.notNull(mf);
-		Validate.notNull(edgesWithoutValue);
-		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must
be at least one.");
-		
-		// check that the edges are actually a valid tuple set of vertex key types
-		TypeInformation<Tuple2<VertexKey, VertexKey>> edgesType = edgesWithoutValue.getType();
-		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 2, "The edges
data set (for edges without edge values) must consist of 2-tuples.");
-		
-		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-			"Both tuple fields (source and target vertex id) must be of the data type that represents
the vertex key and implement the java.lang.Comparable interface.");
-		
-		this.updateFunction = uf;
-		this.messagingFunction = mf;
-		this.edgesWithoutValue = edgesWithoutValue;
-		this.edgesWithValue = null;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
-		this.messageType = getMessageType(mf);
-	}
-	
 	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message>
uf,
 			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, 
-			int maximumNumberOfIterations,
-			boolean edgeHasValueMarker)
+			DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, 
+			int maximumNumberOfIterations, boolean edgeHasValueMarker)
 	{
 		Validate.notNull(uf);
 		Validate.notNull(mf);
 		Validate.notNull(edgesWithValue);
 		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must
be at least one.");
-		
-		// check that the edges are actually a valid tuple set of vertex key types
-		TypeInformation<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesType = edgesWithValue.getType();
-		Validate.isTrue(edgesType.isTupleType() && edgesType.getArity() == 3, "The edges
data set (for edges with edge values) must consist of 3-tuples.");
-		
-		TupleTypeInfo<?> tupleInfo = (TupleTypeInfo<?>) edgesType;
-		Validate.isTrue(tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1))
-			&& Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()),
-			"The first two tuple fields (source and target vertex id) must be of the data type that
represents the vertex key and implement the java.lang.Comparable interface.");
-		
-		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must
be at least one.");
-		
+
 		this.updateFunction = uf;
 		this.messagingFunction = mf;
-		this.edgesWithoutValue = null;
 		this.edgesWithValue = edgesWithValue;
 		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();
-		
+		this.aggregators = new HashMap<String, Aggregator<?>>();		
 		this.messageType = getMessageType(mf);
 	}
 	
@@ -271,20 +230,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
 	 */
 	@Override
-	public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
-		// sanity check that we really have two tuples
-		TypeInformation<Tuple2<VertexKey, VertexValue>> inputType = inputData.getType();
-		Validate.isTrue(inputType.isTupleType() && inputType.getArity() == 2, "The input
data set (the initial vertices) must consist of 2-tuples.");
-
-		// check that the key type here is the same as for the edges
-		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) inputType).getTypeAt(0);
-		TypeInformation<?> edgeType = edgesWithoutValue != null ? edgesWithoutValue.getType()
: edgesWithValue.getType();
-		TypeInformation<VertexKey> edgeKeyType = ((TupleTypeInfo<?>) edgeType).getTypeAt(0);
-		
-		Validate.isTrue(keyType.equals(edgeKeyType), "The first tuple field (the vertex id) of
the input data set (the initial vertices) " +
-				"must be the same data type as the first fields of the edge data set (the source vertex
id). " +
-				"Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.",
keyType, edgeKeyType);
-
+	public void setInput(DataSet<Vertex<VertexKey, VertexValue>> inputData) {
 		this.initialVertices = inputData;
 	}
 	
@@ -294,22 +240,22 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 	 * @return The operator that represents this vertex-centric graph computation.
 	 */
 	@Override
-	public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
+	public DataSet<Vertex<VertexKey, VertexValue>> createResult() {
 		if (this.initialVertices == null) {
 			throw new IllegalStateException("The input data set has not been set.");
 		}
 		
 		// prepare some type information
-		TypeInformation<Tuple2<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
+		TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
 		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
-		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType,
messageType);		
-		
+		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType,
messageType);
+
 		// set up the iteration operator
 		final String name = (this.name != null) ? this.name :
 			"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
 		final int[] zeroKeyPos = new int[] {0};
 	
-		final DeltaIteration<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
iteration =
+		final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
iteration =
 			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations,
zeroKeyPos);
 		iteration.name(name);
 		iteration.parallelism(parallelism);
@@ -322,14 +268,8 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 		
 		// build the messaging function (co group)
 		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
-		if (edgesWithoutValue != null) {
-			MessagingUdfNoEdgeValues<VertexKey, VertexValue, Message> messenger = new MessagingUdfNoEdgeValues<VertexKey,
VertexValue, Message>(messagingFunction, messageTypeInfo);
-			messages = this.edgesWithoutValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		}
-		else {
-			MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger
= new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction,
messageTypeInfo);
-			messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		}
+		MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger
= new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction,
messageTypeInfo);
+		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
 		
 		// configure coGroup message function with name and broadcast variables
 		messages = messages.name("Messaging");
@@ -340,7 +280,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey,
VertexValue, Message>(updateFunction, vertexTypes);
 		
 		// build the update function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, VertexValue>> updates =
+		CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
 				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
 		
 		// configure coGroup update function with name and broadcast variables
@@ -355,43 +295,12 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 		return iteration.closeWith(updates, updates);
 		
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	// Constructor builders to avoid signature conflicts with generic type erasure
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new vertex-centric iteration operator for graphs where the edges are not associated
with a value.
-	 * 
-	 * @param edgesWithoutValue The data set containing edges. Edges are represented as 2-tuples:
(source-id, target-id)
-	 * @param vertexUpdateFunction The function that updates the state of the vertices from
the incoming messages.
-	 * @param messagingFunction The function that turns changed vertex states into messages
along the edges.
-	 * 
-	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
-	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
-	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * 
-	 * @return An in stance of the vertex-centric graph computation operator.
-	 */
-	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message>
-			VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(
-					DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue,
-						VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-						MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction,
-						int maximumNumberOfIterations)
-	{
-		@SuppressWarnings("unchecked")
-		MessagingFunction<VertexKey, VertexValue, Message, Object> tmf = 
-								(MessagingFunction<VertexKey, VertexValue, Message, Object>) messagingFunction;
-		
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, Object>(vertexUpdateFunction,
tmf, edgesWithoutValue, maximumNumberOfIterations);
-	}
-	
+
 	/**
 	 * Creates a new vertex-centric iteration operator for graphs where the edges are associated
with a value (such as
 	 * a weight or distance).
 	 * 
-	 * @param edgesWithValue The data set containing edges. Edges are represented as 2-tuples:
(source-id, target-id)
+	 * @param edgesWithValue The data set containing edges.
 	 * @param uf The function that updates the state of the vertices from the incoming messages.
 	 * @param mf The function that turns changed vertex states into messages along the edges.
 	 * 
@@ -402,9 +311,10 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 	 * 
 	 * @return An in stance of the vertex-centric graph computation operator.
 	 */
-	public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message,
EdgeValue>
-			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(
-					DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue,
+	public static final <VertexKey extends Comparable<VertexKey> & Serializable,
VertexValue extends Serializable, 
+		Message, EdgeValue extends Serializable>
+			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withEdges(
+					DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue,
 					VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
 					MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
 					int maximumNumberOfIterations)
@@ -416,9 +326,10 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 	//  Wrapping UDFs
 	// --------------------------------------------------------------------------------------------
 	
-	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>,
VertexValue, Message> 
-		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>,
Tuple2<VertexKey, VertexValue>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
+	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>
& Serializable, 
+		VertexValue extends Serializable, Message> 
+		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>,
Vertex<VertexKey, VertexValue>>
+		implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
 	{
 		private static final long serialVersionUID = 1L;
 		
@@ -426,25 +337,25 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 
 		private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
 		
-		private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;
+		private transient TypeInformation<Vertex<VertexKey, VertexValue>> resultType;
 		
 		
 		private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-				TypeInformation<Tuple2<VertexKey, VertexValue>> resultType)
+				TypeInformation<Vertex<VertexKey, VertexValue>> resultType)
 		{
 			this.vertexUpdateFunction = vertexUpdateFunction;
 			this.resultType = resultType;
 		}
 
 		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Tuple2<VertexKey,
VertexValue>> vertex,
-				Collector<Tuple2<VertexKey, VertexValue>> out)
+		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Vertex<VertexKey,
VertexValue>> vertex,
+				Collector<Vertex<VertexKey, VertexValue>> out)
 			throws Exception
 		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> vertexIter = vertex.iterator();
+			final Iterator<Vertex<VertexKey, VertexValue>> vertexIter = vertex.iterator();
 			
 			if (vertexIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> vertexState = vertexIter.next();
+				Vertex<VertexKey, VertexValue> vertexState = vertexIter.next();
 				
 				@SuppressWarnings("unchecked")
 				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>)
(Iterator<?>) messages.iterator();
@@ -482,71 +393,17 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 		}
 
 		@Override
-		public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
+		public TypeInformation<Vertex<VertexKey, VertexValue>> getProducedType() {
 			return this.resultType;
 		}
 	}
-	
-	/*
-	 * UDF that encapsulates the message sending function for graphs where the edges have no
associated values.
-	 */
-	private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>,
VertexValue, Message> 
-		extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey,
VertexValue>, Tuple2<VertexKey, Message>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
-	{
-		private static final long serialVersionUID = 1L;
-		
-		private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
-		
-		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-		
-		
-		private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message,
?> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType)
-		{
-			this.messagingFunction = messagingFunction;
-			this.resultType = resultType;
-		}
-		
-		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> edges,
-				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey,
Message>> out)
-			throws Exception
-		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
-			
-			if (stateIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-			}
-		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext(), false);
-			}
-			
-			this.messagingFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.messagingFunction.postSuperstep();
-		}
 
-		@Override
-		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
-			return this.resultType;
-		}
-	}
-	
 	/*
 	 * UDF that encapsulates the message sending function for graphs where the edges have an
associated value.
 	 */
-	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>,
VertexValue, Message, EdgeValue> 
-		extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey,
VertexValue>, Tuple2<VertexKey, Message>>
+	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>
& Serializable, 
+		VertexValue extends Serializable, Message, EdgeValue extends Serializable> 
+		extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, Vertex<VertexKey, VertexValue>,
Tuple2<VertexKey, Message>>
 		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
 	{
 		private static final long serialVersionUID = 1L;
@@ -564,14 +421,14 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 		}
 
 		@Override
-		public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> edges,
-				Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey,
Message>> out)
+		public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
+				Iterable<Vertex<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey,
Message>> out)
 			throws Exception
 		{
-			final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator();
+			final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator();
 			
 			if (stateIter.hasNext()) {
-				Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next();
+				Vertex<VertexKey, VertexValue> newVertexState = stateIter.next();
 				messagingFunction.set((Iterator<?>) edges.iterator(), out);
 				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
 			}
@@ -580,7 +437,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>,
Ver
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.messagingFunction.init(getIterationRuntimeContext(), true);
+				this.messagingFunction.init(getIterationRuntimeContext());
 			}
 			
 			this.messagingFunction.preSuperstep();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45c0491/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index c3fd2b1..e30451c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -23,10 +23,11 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
+import flink.graphs.Vertex;
+
 /**
  * This class must be extended by functions that compute the state of the vertex depending
on the old state and the
  * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)},
which is
@@ -36,7 +37,8 @@ import org.apache.flink.util.Collector;
  * <VertexValue> The vertex value type.
  * <Message> The message type.
  */
-public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>,
VertexValue, Message> implements Serializable {
+public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey>
& Serializable, 
+	VertexValue extends Serializable, Message> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -129,16 +131,16 @@ public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKe
 	
 	private IterationRuntimeContext runtimeContext;
 	
-	private Collector<Tuple2<VertexKey, VertexValue>> out;
+	private Collector<Vertex<VertexKey, VertexValue>> out;
 	
-	private Tuple2<VertexKey, VertexValue> outVal;
+	private Vertex<VertexKey, VertexValue> outVal;
 	
 	
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
 	}
 	
-	void setOutput(Tuple2<VertexKey, VertexValue> val, Collector<Tuple2<VertexKey,
VertexValue>> out) {
+	void setOutput(Vertex<VertexKey, VertexValue> val, Collector<Vertex<VertexKey,
VertexValue>> out) {
 		this.out = out;
 		this.outVal = val;
 	}


Mime
View raw message