flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject flink git commit: [gelly] removes generic type constraints
Date Thu, 07 May 2015 22:32:01 GMT
Repository: flink
Updated Branches:
  refs/heads/master ddb2b3470 -> ce2163e6c


[gelly] removes generic type constraints

This closes #657


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce2163e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce2163e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce2163e6

Branch: refs/heads/master
Commit: ce2163e6c987a9fbd9235f6e5b24c0aa8e475b04
Parents: ddb2b34
Author: vasia <vasia@apache.org>
Authored: Thu May 7 00:42:00 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Fri May 8 00:30:50 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Edge.java  |   5 +-
 .../org/apache/flink/graph/EdgesFunction.java   |   3 +-
 .../graph/EdgesFunctionWithVertexValue.java     |   3 +-
 .../main/java/org/apache/flink/graph/Graph.java | 203 +++++++------------
 .../org/apache/flink/graph/GraphAlgorithm.java  |   4 +-
 .../apache/flink/graph/NeighborsFunction.java   |   3 +-
 .../graph/NeighborsFunctionWithVertexValue.java |   3 +-
 .../apache/flink/graph/ReduceEdgesFunction.java |   2 +-
 .../flink/graph/ReduceNeighborsFunction.java    |   2 +-
 .../java/org/apache/flink/graph/Triplet.java    |   5 +-
 .../java/org/apache/flink/graph/Vertex.java     |   5 +-
 .../apache/flink/graph/gsa/ApplyFunction.java   |   3 +-
 .../apache/flink/graph/gsa/GatherFunction.java  |   2 +-
 .../graph/gsa/GatherSumApplyIteration.java      |  25 +--
 .../org/apache/flink/graph/gsa/Neighbor.java    |   5 +-
 .../org/apache/flink/graph/gsa/SumFunction.java |   2 +-
 .../flink/graph/library/LabelPropagation.java   |  10 +-
 .../apache/flink/graph/library/PageRank.java    |  11 +-
 .../library/SingleSourceShortestPaths.java      |  14 +-
 .../flink/graph/spargel/MessagingFunction.java  |   6 +-
 .../graph/spargel/VertexCentricIteration.java   |  13 +-
 .../graph/spargel/VertexUpdateFunction.java     |   3 +-
 .../flink/graph/utils/EdgeToTuple3Map.java      |   5 +-
 .../flink/graph/utils/Tuple2ToVertexMap.java    |   5 +-
 .../flink/graph/utils/Tuple3ToEdgeMap.java      |   5 +-
 .../flink/graph/utils/VertexToTuple2Map.java    |   5 +-
 .../flink/graph/validation/GraphValidator.java  |   3 +-
 .../validation/InvalidVertexIdsValidator.java   |  11 +-
 .../flink/graph/test/WeaklyConnectedITCase.java | 118 -----------
 29 files changed, 116 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
index 1bef5b1..d84badb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.graph;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.java.tuple.Tuple3;
 
 /**
@@ -30,8 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
  * @param <K> the key type for the sources and target vertices
  * @param <V> the edge value type
  */
-public class Edge<K extends Comparable<K> & Serializable, V extends Serializable> 
-	extends Tuple3<K, K, V>{
+public class Edge<K, V> extends Tuple3<K, K, V>{
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
index aac63db..bf1d6a2 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
@@ -32,8 +32,7 @@ import org.apache.flink.util.Collector;
  * @param <EV> the edge value type
  * @param <O> the type of the return value
  */
-public interface EdgesFunction<K extends Comparable<K> & Serializable, 
-	EV extends Serializable, O> extends Function, Serializable {
+public interface EdgesFunction<K, EV, O> extends Function, Serializable {
 
 	void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
index f4f4320..0b0ab0e 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
@@ -33,8 +33,7 @@ import org.apache.flink.util.Collector;
  * @param <EV> the edge value type
  * @param <O> the type of the return value
  */
-public interface EdgesFunctionWithVertexValue<K extends Comparable<K> & Serializable, 
-	VV extends Serializable, EV extends Serializable, O> extends Function, Serializable {
+public interface EdgesFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
 
 	void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 1c0052d..490658c 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph;
 
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -30,17 +29,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -77,7 +72,7 @@ import org.apache.flink.types.NullValue;
  * @param <EV> the value type for edges
  */
 @SuppressWarnings("serial")
-public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> {
+public class Graph<K, VV, EV> {
 
 	private final ExecutionEnvironment context;
 	private final DataSet<Vertex<K, VV>> vertices;
@@ -104,9 +99,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromCollection(
-			Collection<Vertex<K, VV>> vertices, Collection<Edge<K, EV>> edges,
-			ExecutionEnvironment context) {
+	public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
+			Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
 
 		return fromDataSet(context.fromCollection(vertices),
 				context.fromCollection(edges), context);
@@ -121,8 +115,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromCollection(
-			Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
+	public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
+			ExecutionEnvironment context) {
 
 		return fromDataSet(context.fromCollection(edges), context);
 	}
@@ -138,9 +132,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromCollection(
-			Collection<Edge<K, EV>> edges, final MapFunction<K, VV> mapper,
-			ExecutionEnvironment context) {
+	public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
+			final MapFunction<K, VV> mapper,ExecutionEnvironment context) {
 
 		return fromDataSet(context.fromCollection(edges), mapper, context);
 	}
@@ -153,9 +146,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromDataSet(
-			DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges,
-			ExecutionEnvironment context) {
+	public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
+			DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
 
 		return new Graph<K, VV, EV>(vertices, edges, context);
 	}
@@ -169,7 +161,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromDataSet(
+	public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
 			DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
 
 		DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
@@ -177,8 +169,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, NullValue, EV>(vertices, edges, context);
 	}
 
-	private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable, EV extends Serializable>
-			implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
+	private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
+		Edge<K, EV>, Vertex<K, NullValue>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
 			out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
@@ -197,8 +189,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromDataSet(
-			DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+	public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges,
+			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 
@@ -220,8 +212,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(vertices, edges, context);
 	}
 
-	private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> & Serializable, EV extends Serializable>
-			implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+	private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
+		Edge<K, EV>, Tuple1<K>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
 			out.collect(new Tuple1<K>(edge.f0));
@@ -240,8 +232,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromTupleDataSet(
-			DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
+	public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices,
+			DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
 
 		DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>());
 		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
@@ -259,8 +251,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromTupleDataSet(
-			DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
+	public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
+			ExecutionEnvironment context) {
 
 		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
 		return fromDataSet(edgeDataSet, context);
@@ -278,8 +270,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromTupleDataSet(
-			DataSet<Tuple3<K, K, EV>> edges, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+	public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
+			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
 
 		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
 		return fromDataSet(edgeDataSet, mapper, context);
@@ -367,7 +359,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @return a new graph
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) {
+	public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
 
@@ -393,7 +385,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @return a new graph
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) {
+	public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 
@@ -430,7 +422,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
 	}
 
-	private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> & Serializable, VV extends Serializable, T>
+	private static final class ApplyCoGroupToVertexValues<K, VV, T>
 			implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
 
 		private MapFunction<Tuple2<VV, T>, VV> mapper;
@@ -479,7 +471,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
 	}
 
-	private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> & Serializable, EV extends Serializable, T>
+	private static final class ApplyCoGroupToEdgeValues<K, EV, T>
 			implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
 
 		private MapFunction<Tuple2<EV, T>, EV> mapper;
@@ -530,7 +522,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
 	}
 
-	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & Serializable, EV extends Serializable, T>
+	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
 			implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
 
 		private MapFunction<Tuple2<EV, T>, EV> mapper;
@@ -643,8 +635,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	@ForwardedFieldsFirst("0->0;1->1;2->2")
-	private static final class ProjectEdge<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
+	private static final class ProjectEdge<K, VV, EV> implements FlatJoinFunction<
+		Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
 		public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>> out) {
 			out.collect(first);
 		}
@@ -660,7 +652,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
 	}
 
-	private static final class CountNeighborsCoGroup<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+	private static final class CountNeighborsCoGroup<K, VV, EV>
 			implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> {
 		@SuppressWarnings("unused")
 		public void coGroup(Iterable<Vertex<K, VV>> vertex,	Iterable<Edge<K, EV>> outEdges,
@@ -772,8 +764,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ProjectVertexIdMap<K extends Comparable<K> & Serializable, EV extends Serializable>
-			implements MapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
+	private static final class ProjectVertexIdMap<K, EV> implements MapFunction<
+		Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
 
 		private int fieldPosition;
 
@@ -787,8 +779,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ProjectVertexWithEdgeValueMap<K extends Comparable<K> & Serializable, EV extends Serializable>
-			implements MapFunction<Edge<K, EV>, Tuple2<K, EV>> {
+	private static final class ProjectVertexWithEdgeValueMap<K, EV>	implements MapFunction<
+		Edge<K, EV>, Tuple2<K, EV>> {
 
 		private int fieldPosition;
 
@@ -802,8 +794,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ApplyGroupReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable, T>
-			implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>,	ResultTypeQueryable<T> {
+	private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<
+		Tuple2<K, Edge<K, EV>>, T>,	ResultTypeQueryable<T> {
 
 		private EdgesFunction<K, EV, T> function;
 
@@ -821,32 +813,35 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class EmitOneEdgePerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements FlatMapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
+	private static final class EmitOneEdgePerNode<K, VV, EV> implements FlatMapFunction<
+		Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
+
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
 			out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge));
 			out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge));
 		}
 	}
 
-	private static final class EmitOneVertexWithEdgeValuePerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements FlatMapFunction<Edge<K, EV>, Tuple2<K, EV>> {
+	private static final class EmitOneVertexWithEdgeValuePerNode<K, VV, EV>	implements FlatMapFunction<
+		Edge<K, EV>, Tuple2<K, EV>> {
+
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
 			out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
 			out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
 		}
 	}
 
-	private static final class EmitOneEdgeWithNeighborPerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements FlatMapFunction<Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
+	private static final class EmitOneEdgeWithNeighborPerNode<K, VV, EV> implements FlatMapFunction<
+		Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
+
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
 			out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge));
 			out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge));
 		}
 	}
 
-	private static final class ApplyCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T>
-			implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, T>, ResultTypeQueryable<T> {
+	private static final class ApplyCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
+		Vertex<K, VV>, Edge<K, EV>, T>, ResultTypeQueryable<T> {
 
 		private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
 
@@ -866,7 +861,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ApplyCoGroupFunctionOnAllEdges<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T>
+	private static final class ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>
 			implements	CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
 
 		private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
@@ -915,7 +910,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	@ForwardedFields("0->1;1->0;2->2")
-	private static final class ReverseEdgesMap<K extends Comparable<K> & Serializable, EV extends Serializable>
+	private static final class ReverseEdgesMap<K, EV>
 			implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
 
 		public Edge<K, EV> map(Edge<K, EV> value) {
@@ -955,7 +950,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return vertices.map(new ExtractVertexIDMapper<K, VV>());
 	}
 
-	private static final class ExtractVertexIDMapper<K extends Comparable<K> & Serializable, VV extends Serializable>
+	private static final class ExtractVertexIDMapper<K, VV>
 			implements MapFunction<Vertex<K, VV>, K> {
 		@Override
 		public K map(Vertex<K, VV> vertex) {
@@ -970,7 +965,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return edges.map(new ExtractEdgeIDsMapper<K, EV>());
 	}
 
-	private static final class ExtractEdgeIDsMapper<K extends Comparable<K> & Serializable, EV extends Serializable>
+	private static final class ExtractEdgeIDsMapper<K, EV>
 			implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
 		@Override
 		public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
@@ -979,67 +974,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	/**
-	 * Checks the weak connectivity of a graph.
-	 * 
-	 * @param maxIterations
-	 *            the maximum number of iterations for the inner delta iteration
-	 * @return true if the graph is weakly connected.
-	 */
-	public boolean isWeaklyConnected(int maxIterations) throws Exception {
-		// first, convert to an undirected graph
-		Graph<K, VV, EV> graph = this.getUndirected();
-
-		DataSet<K> vertexIds = graph.getVertexIds();
-		DataSet<Tuple2<K, K>> verticesWithInitialIds = vertexIds
-				.map(new DuplicateVertexIDMapper<K>());
-
-		DataSet<Tuple2<K, K>> edgeIds = graph.getEdgeIds();
-
-		DeltaIteration<Tuple2<K, K>, Tuple2<K, K>> iteration = verticesWithInitialIds
-				.iterateDelta(verticesWithInitialIds, maxIterations, 0);
-
-		DataSet<Tuple2<K, K>> changes = iteration.getWorkset()
-				.join(edgeIds, JoinHint.REPARTITION_SORT_MERGE)
-				.where(0).equalTo(0).with(new FindNeighborsJoin<K>())
-				.groupBy(0).aggregate(Aggregations.MIN, 1)
-				.join(iteration.getSolutionSet(), JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0)
-				.with(new VertexWithNewComponentJoin<K>());
-
-		DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes);
-		return components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()).count() == 1;
-	}
-
-	private static final class DuplicateVertexIDMapper<K> implements MapFunction<K, Tuple2<K, K>> {
-		@Override
-		public Tuple2<K, K> map(K k) {
-			return new Tuple2<K, K>(k, k);
-		}
-	}
-
-	private static final class FindNeighborsJoin<K> implements JoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>> {
-		@Override
-		public Tuple2<K, K> join(Tuple2<K, K> vertexWithComponent, Tuple2<K, K> edge) {
-			return new Tuple2<K, K>(edge.f1, vertexWithComponent.f1);
-		}
-	}
-
-	private static final class VertexWithNewComponentJoin<K extends Comparable<K>>
-			implements FlatJoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>> {
-		@Override
-		public void join(Tuple2<K, K> candidate, Tuple2<K, K> old, Collector<Tuple2<K, K>> out) {
-			if (candidate.f1.compareTo(old.f1) < 0) {
-				out.collect(candidate);
-			}
-		}
-	}
-
-	private static final class EmitFirstReducer<K> implements GroupReduceFunction<Tuple2<K, K>, Tuple2<K, K>> {
-		public void reduce(Iterable<Tuple2<K, K>> values, Collector<Tuple2<K, K>> out) {
-			out.collect(values.iterator().next());
-		}
-	}
-
-	/**
 	 * Adds the input vertex and edges to the graph. If the vertex already
 	 * exists in the graph, it will not be added again, but the given edges
 	 * will.
@@ -1098,7 +1032,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
 	}
 
-	private static final class RemoveVertexFilter<K extends Comparable<K> & Serializable, VV extends Serializable>
+	private static final class RemoveVertexFilter<K, VV>
 			implements FilterFunction<Vertex<K, VV>> {
 
 		private Vertex<K, VV> vertexToRemove;
@@ -1113,7 +1047,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class VertexRemovalEdgeFilter<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+	private static final class VertexRemovalEdgeFilter<K, VV, EV>
 			implements FilterFunction<Edge<K, EV>> {
 
 		private Vertex<K, VV> vertexToRemove;
@@ -1147,7 +1081,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
 	}
 
-	private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & Serializable, EV extends Serializable>
+	private static final class EdgeRemovalEdgeFilter<K, EV>
 			implements FilterFunction<Edge<K, EV>> {
 		private Edge<K, EV> edgeToRemove;
 
@@ -1336,7 +1270,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ApplyNeighborGroupReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T>
+	private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
 			implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
 
 		private NeighborsFunction<K, VV, EV, T> function;
@@ -1355,7 +1289,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ProjectVertexWithNeighborValueJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+	private static final class ProjectVertexWithNeighborValueJoin<K, VV, EV>
 			implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> {
 
 		private int fieldPosition;
@@ -1371,8 +1305,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ProjectVertexIdJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+	private static final class ProjectVertexIdJoin<K, VV, EV> implements FlatJoinFunction<
+		Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+
 		private int fieldPosition;
 		public ProjectVertexIdJoin(int position) {
 			this.fieldPosition = position;
@@ -1384,8 +1319,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ProjectNeighborValue<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements	FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
+	private static final class ProjectNeighborValue<K, VV, EV> implements FlatJoinFunction<
+		Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
 
 		public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
 				Collector<Tuple2<K, VV>> out) {
@@ -1394,8 +1329,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ProjectEdgeWithNeighbor<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements	FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+	private static final class ProjectEdgeWithNeighbor<K, VV, EV> implements FlatJoinFunction<
+		Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
 
 		public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
 						Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
@@ -1403,8 +1338,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ApplyNeighborCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T>
-			implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
+	private static final class ApplyNeighborCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
+		Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
 
 		private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
 
@@ -1423,7 +1358,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T>
+	private static final class ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>
 			implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
 
 		private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
@@ -1513,8 +1448,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	@ForwardedFields("f0")
-	private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable>
-			implements ReduceFunction<Tuple2<K, VV>> {
+	private static final class ApplyNeighborReduceFunction<K, VV> implements ReduceFunction<Tuple2<K, VV>> {
 
 		private ReduceNeighborsFunction<VV> function;
 
@@ -1561,8 +1495,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	@ForwardedFields("f0")
-	private static final class ApplyReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable>
-			implements ReduceFunction<Tuple2<K, EV>> {
+	private static final class ApplyReduceFunction<K, EV> implements ReduceFunction<Tuple2<K, EV>> {
 
 		private ReduceEdgesFunction<EV> function;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index ceeeaf4..04181d5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.graph;
 
-import java.io.Serializable;
-
 /**
  * @param <K> key type
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
-public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> {
+public interface GraphAlgorithm<K, VV, EV> {
 
 	public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
index b43f9d1..a21b23d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -34,8 +34,7 @@ import org.apache.flink.util.Collector;
  * @param <EV> the edge value type
  * @param <O> the type of the return value
  */
-public interface NeighborsFunction<K extends Comparable<K> & Serializable, VV extends Serializable, 
-	EV extends Serializable, O> extends Function, Serializable {
+public interface NeighborsFunction<K, VV, EV, O> extends Function, Serializable {
 
 	void iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
index 32d184d..fdf54fa 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -34,8 +34,7 @@ import org.apache.flink.util.Collector;
  * @param <EV> the edge value type
  * @param <O> the type of the return value
  */
-public interface NeighborsFunctionWithVertexValue<K extends Comparable<K> & Serializable, VV extends Serializable, 
-	EV extends Serializable, O> extends Function, Serializable {
+public interface NeighborsFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
 
 	void iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
index 707efbf..84eec51 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
  *
  * @param <EV> the edge value type
  */
-public interface ReduceEdgesFunction<EV extends Serializable> extends Function, Serializable {
+public interface ReduceEdgesFunction<EV> extends Function, Serializable {
 
 	EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
index 4b5a930..fc5295d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
  *
  * @param <VV> the vertex value type
  */
-public interface ReduceNeighborsFunction <VV extends Serializable> extends Function, Serializable {
+public interface ReduceNeighborsFunction <VV> extends Function, Serializable {
 
 	VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
index b85987d..dee3480 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -20,8 +20,6 @@ package org.apache.flink.graph;
 
 import org.apache.flink.api.java.tuple.Tuple5;
 
-import java.io.Serializable;
-
 /**
  * A Triplet stores and retrieves the edges along with their corresponding source and target vertices.
  * Triplets can be obtained from the input graph via the {@link org.apache.flink.graph.Graph#getTriplets()} method.
@@ -30,8 +28,7 @@ import java.io.Serializable;
  * @param <VV> the vertex value type
  * @param <EV> the edge value type
  */
-public class Triplet <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-		extends Tuple5<K, K, VV, VV, EV> {
+public class Triplet <K, VV, EV> extends Tuple5<K, K, VV, VV, EV> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
index 2f71843..c5eb973 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.graph;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
@@ -29,8 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
  * @param <K>
  * @param <V>
  */
-public class Vertex<K extends Comparable<K> & Serializable, V extends Serializable> 
-	extends Tuple2<K, V> {
+public class Vertex<K, V> extends Tuple2<K, V> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index a4963e0..7d24253 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -25,8 +25,7 @@ import org.apache.flink.util.Collector;
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public abstract class ApplyFunction<K extends Comparable<K> & Serializable, VV extends Serializable, M>
-	implements Serializable {
+public abstract class ApplyFunction<K, VV, M> implements Serializable {
 
 	public abstract void apply(M newValue, VV currentValue);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 4ffae8d..1c4b2c4 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+public abstract class GatherFunction<VV, EV, M> implements Serializable {
 
 	public abstract M gather(Neighbor<VV, EV> neighbor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 22be591..1de3839 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -39,8 +39,6 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 
-import java.io.Serializable;
-
 /**
  * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
  *
@@ -49,8 +47,7 @@ import java.io.Serializable;
  * @param <EV> The type of the edge value in the graph
  * @param <M> The intermediate type used by the gather, sum and apply functions
  */
-public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
-		VV extends Serializable, EV extends Serializable, M> implements CustomUnaryOperation<Vertex<K, VV>,
+public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperation<Vertex<K, VV>,
 		Vertex<K, VV>> {
 
 	private DataSet<Vertex<K, VV>> vertexDataSet;
@@ -159,10 +156,10 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 	 *
 	 * @return An in stance of the gather-sum-apply graph computation operator.
 	 */
-	public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
-			GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
-			GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply,
-			int maximumNumberOfIterations) {
+	public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
+		withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather,
+		SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
+
 		return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
 	}
 
@@ -172,8 +169,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 
 	@SuppressWarnings("serial")
 	@ForwardedFields("f0")
-	private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
-			EV extends Serializable, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
+	private static final class GatherUdf<K, VV, EV, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
 			Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
 
 		private final GatherFunction<VV, EV, M> gatherFunction;
@@ -210,8 +206,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SumUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
-			EV extends Serializable, M> extends RichReduceFunction<Tuple2<K, M>>
+	private static final class SumUdf<K, VV, EV, M> extends RichReduceFunction<Tuple2<K, M>>
 			implements ResultTypeQueryable<Tuple2<K, M>>{
 
 		private final SumFunction<VV, EV, M> sumFunction;
@@ -249,8 +244,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 	}
 
 	@SuppressWarnings("serial")
-	private static final class ApplyUdf<K extends Comparable<K> & Serializable,
-			VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
+	private static final class ApplyUdf<K, VV, EV, M> extends RichFlatJoinFunction<Tuple2<K, M>,
 			Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
 
 		private final ApplyFunction<K, VV, M> applyFunction;
@@ -289,8 +283,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 
 	@SuppressWarnings("serial")
 	@ForwardedFieldsSecond("f1->f0")
-	private static final class ProjectKeyWithNeighbor<K extends Comparable<K> & Serializable,
-			VV extends Serializable, EV extends Serializable> implements FlatJoinFunction<
+	private static final class ProjectKeyWithNeighbor<K, VV, EV> implements FlatJoinFunction<
 			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
 
 		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
index 5a06af9..7fa1ed2 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -20,8 +20,6 @@ package org.apache.flink.graph.gsa;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 
-import java.io.Serializable;
-
 /**
  * This class represents a <sourceVertex, edge> pair
  * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
@@ -29,8 +27,7 @@ import java.io.Serializable;
  * @param <EV> the edge value type
  */
 @SuppressWarnings("serial")
-public class Neighbor<VV extends Serializable, EV extends Serializable>
-		extends Tuple2<VV, EV> {
+public class Neighbor<VV, EV> extends Tuple2<VV, EV> {
 
 	public Neighbor() {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 4836af6..0a5e4ae 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public abstract class SumFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+public abstract class SumFunction<VV, EV, M> implements Serializable {
 
 	public abstract M sum(M arg0, M arg1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index ff6fe85..d63a4c3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.NullValue;
 
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -41,8 +40,7 @@ import java.util.Map.Entry;
  * 
  */
 @SuppressWarnings("serial")
-public class LabelPropagation<K extends Comparable<K> & Serializable>
-		implements GraphAlgorithm<K, Long, NullValue> {
+public class LabelPropagation<K> implements GraphAlgorithm<K, Long, NullValue> {
 
 	private final int maxIterations;
 
@@ -63,8 +61,7 @@ public class LabelPropagation<K extends Comparable<K> & Serializable>
 	 * Function that updates the value of a vertex by adopting the most frequent
 	 * label among its in-neighbors
 	 */
-	public static final class UpdateVertexLabel<K extends Comparable<K> & Serializable>
-			extends VertexUpdateFunction<K, Long, Long> {
+	public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
 
 		public void updateVertex(K vertexKey, Long vertexValue,
 				MessageIterator<Long> inMessages) {
@@ -105,8 +102,7 @@ public class LabelPropagation<K extends Comparable<K> & Serializable>
 	/**
 	 * Sends the vertex label to all out-neighbors
 	 */
-	public static final class SendNewLabelToNeighbors<K extends Comparable<K> & Serializable>
-			extends MessagingFunction<K, Long, Long, NullValue> {
+	public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
 
 		public void sendMessages(K vertexKey, Long newLabel) {
 			sendMessageToAllNeighbors(newLabel);

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 48c9a51..bb0a1d1 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.graph.library;
 
-import java.io.Serializable;
-
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -27,8 +25,7 @@ import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
-public class PageRank<K extends Comparable<K> & Serializable> implements
-		GraphAlgorithm<K, Double, Double> {
+public class PageRank<K> implements	GraphAlgorithm<K, Double, Double> {
 
 	private double beta;
 	private int maxIterations;
@@ -51,8 +48,7 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	 * ranks from all incoming messages and then applying the dampening formula.
 	 */
 	@SuppressWarnings("serial")
-	public static final class VertexRankUpdater<K extends Comparable<K> & Serializable>
-			extends VertexUpdateFunction<K, Double, Double> {
+	public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
 
 		private final double beta;
 		private final long numVertices;
@@ -82,8 +78,7 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	 * value.
 	 */
 	@SuppressWarnings("serial")
-	public static final class RankMessenger<K extends Comparable<K> & Serializable>
-			extends MessagingFunction<K, Double, Double, Double> {
+	public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
 
 		private final long numVertices;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 262b2c5..f4f8b27 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -27,11 +27,8 @@ import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
-import java.io.Serializable;
-
 @SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
-		implements GraphAlgorithm<K, Double, Double> {
+public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double> {
 
 	private final K srcVertexId;
 	private final Integer maxIterations;
@@ -49,8 +46,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
 				maxIterations);
 	}
 
-	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
-			implements MapFunction<Vertex<K, Double>, Double> {
+	public static final class InitVerticesMapper<K>	implements MapFunction<Vertex<K, Double>, Double> {
 
 		private K srcVertexId;
 
@@ -73,8 +69,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
 	 * 
 	 * @param <K>
 	 */
-	public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable>
-			extends VertexUpdateFunction<K, Double, Double> {
+	public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
 
 		@Override
 		public void updateVertex(K vertexKey, Double vertexValue,
@@ -100,8 +95,7 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
 	 * 
 	 * @param <K>
 	 */
-	public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable>
-			extends MessagingFunction<K, Double, Double, Double> {
+	public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
 
 		@Override
 		public void sendMessages(K vertexKey, Double newDistance)

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index e8a297f..b7e74e3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -38,8 +38,7 @@ import org.apache.flink.util.Collector;
  * @param <Message> The type of the message sent between vertices along the edges.
  * @param <EdgeValue> The type of the values that are associated with the edges.
  */
-public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey> & Serializable, 
-	VertexValue extends Serializable, Message, EdgeValue extends Serializable> implements Serializable {
+public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -198,8 +197,7 @@ public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>
 		this.edgesUsed = false;
 	}
 	
-	private static final class EdgesIterator<VertexKey extends Comparable<VertexKey> & Serializable, 
-		EdgeValue extends Serializable> 
+	private static final class EdgesIterator<VertexKey, EdgeValue> 
 		implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey, EdgeValue>>
 	{
 		private Iterator<Edge<VertexKey, EdgeValue>> input;

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index ca66521..5ad1420 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.spargel;
 
-import java.io.Serializable;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -69,8 +68,7 @@ import org.apache.flink.util.Collector;
  * @param <Message> The type of the message sent between vertices along the edges.
  * @param <EdgeValue> The type of the values that are associated with the edges.
  */
-public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Serializable, VertexValue extends Serializable, 
-	Message, EdgeValue extends Serializable> 
+public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue> 
 	implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
 {
 	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
@@ -218,8 +216,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 	 * 
 	 * @return An in stance of the vertex-centric graph computation operator.
 	 */
-	public static final <VertexKey extends Comparable<VertexKey> & Serializable, VertexValue extends Serializable, 
-		Message, EdgeValue extends Serializable>
+	public static final <VertexKey, VertexValue, Message, EdgeValue>
 			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withEdges(
 					DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue,
 					VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
@@ -233,8 +230,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 	//  Wrapping UDFs
 	// --------------------------------------------------------------------------------------------
 	
-	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey> & Serializable, 
-		VertexValue extends Serializable, Message> 
+	private static final class VertexUpdateUdf<VertexKey, VertexValue, Message> 
 		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
 		implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
 	{
@@ -308,8 +304,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 	/*
 	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
 	 */
-	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey> & Serializable, 
-		VertexValue extends Serializable, Message, EdgeValue extends Serializable> 
+	private static final class MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> 
 		extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, Vertex<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
 		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
 	{

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 5a7cd5c..561c87a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -36,8 +36,7 @@ import org.apache.flink.util.Collector;
  * <VertexValue> The vertex value type.
  * <Message> The message type.
  */
-public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKey> & Serializable, 
-	VertexValue extends Serializable, Message> implements Serializable {
+public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
index a7b7b62..c83fc9c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.graph.utils;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 
-public class EdgeToTuple3Map<K extends Comparable<K> & Serializable, 
-	EV extends Serializable> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {
+public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
index d58e4ff..f9645dc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.graph.utils;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Vertex;
 
-public class Tuple2ToVertexMap<K extends Comparable<K> & Serializable, 
-	VV extends Serializable> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {
+public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
index 3668dd2..afeff89 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.graph.utils;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
@@ -30,8 +28,7 @@ import org.apache.flink.graph.Edge;
  * @param <K>
  * @param <EV>
  */
-public class Tuple3ToEdgeMap<K extends Comparable<K> & Serializable, 
-	EV extends Serializable> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> {
+public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
index 318e1ed..9ce6f33 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.graph.utils;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Vertex;
 
-public class VertexToTuple2Map<K extends Comparable<K> & Serializable, 
-	VV extends Serializable> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {
+public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 101e82c..75b672c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -30,8 +30,7 @@ import org.apache.flink.graph.Graph;
  * @param <EV> the edge value type
  */
 @SuppressWarnings("serial")
-public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-		implements Serializable {
+public abstract class GraphValidator<K, VV, EV>	implements Serializable {
 
 	public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index cc06ca7..33d469b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -28,11 +28,8 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 
-import java.io.Serializable;
-
 @SuppressWarnings("serial")
-public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-		extends GraphValidator<K, VV, EV> {
+public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, EV> {
 
 	/**
 	 * Checks that the edge set input contains valid vertex Ids, i.e. that they
@@ -51,16 +48,14 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
 		return invalidIds.map(new KToTupleMap<K>()).count() == 0;
 	}
 
-	private static final class MapEdgeIds<K extends Comparable<K> & Serializable, EV extends Serializable>
-			implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+	private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
 			out.collect(new Tuple1<K>(edge.f0));
 			out.collect(new Tuple1<K>(edge.f1));
 		}
 	}
 
-	private static final class GroupInvalidIds<K extends Comparable<K> & Serializable, VV extends Serializable>
-			implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
+	private static final class GroupInvalidIds<K, VV> implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
 		public void coGroup(Iterable<Vertex<K, VV>> vertexId,
 				Iterable<Tuple1<K>> edgeId, Collector<K> out) {
 			if (!(vertexId.iterator().hasNext())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
deleted file mode 100644
index 9db449e..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
-
-	public WeaklyConnectedITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testWithConnectedDirected() throws Exception {
-		/*
-		 * Test isWeaklyConnected() with a connected, directed graph
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
-		
-		env.execute();
-		expectedResult = "true\n";
-	}
-
-	@Test
-	public void testWithDisconnectedDirected() throws Exception {
-		/*
-		 * Test isWeaklyConnected() with a disconnected, directed graph
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
-		
-		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
-		
-		env.execute();
-		expectedResult = "false\n";
-	}
-
-	@Test
-	public void testWithConnectedUndirected() throws Exception {
-		/*
-		 * Test isWeaklyConnected() with a connected, undirected graph
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env).getUndirected();
-		
-		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
-		
-		env.execute();
-		expectedResult = "true\n";
-	}
-
-	@Test
-	public void testWithDisconnectedUndirected() throws Exception {
-		/*
-		 * Test isWeaklyConnected() with a disconnected, undirected graph
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
-		
-		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
-		
-		env.execute();
-		expectedResult = "false\n";
-	}
-}


Mime
View raw message