Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5ED2D1733C for ; Thu, 7 May 2015 22:32:01 +0000 (UTC) Received: (qmail 82961 invoked by uid 500); 7 May 2015 22:32:01 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 82929 invoked by uid 500); 7 May 2015 22:32:01 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 82919 invoked by uid 99); 7 May 2015 22:32:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2015 22:32:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2172BE442E; Thu, 7 May 2015 22:32:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Message-Id: <8c258e79e153468f801afdfc244eaa5c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [gelly] removes generic type constraints Date: Thu, 7 May 2015 22:32:01 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master ddb2b3470 -> ce2163e6c [gelly] removes generic type constraints This closes #657 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce2163e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce2163e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce2163e6 Branch: refs/heads/master Commit: ce2163e6c987a9fbd9235f6e5b24c0aa8e475b04 Parents: ddb2b34 Author: vasia Authored: Thu May 7 00:42:00 2015 +0200 Committer: vasia Committed: Fri May 8 00:30:50 2015 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Edge.java | 5 +- .../org/apache/flink/graph/EdgesFunction.java | 3 +- .../graph/EdgesFunctionWithVertexValue.java | 3 +- .../main/java/org/apache/flink/graph/Graph.java | 203 +++++++------------ .../org/apache/flink/graph/GraphAlgorithm.java | 4 +- .../apache/flink/graph/NeighborsFunction.java | 3 +- .../graph/NeighborsFunctionWithVertexValue.java | 3 +- .../apache/flink/graph/ReduceEdgesFunction.java | 2 +- .../flink/graph/ReduceNeighborsFunction.java | 2 +- .../java/org/apache/flink/graph/Triplet.java | 5 +- .../java/org/apache/flink/graph/Vertex.java | 5 +- .../apache/flink/graph/gsa/ApplyFunction.java | 3 +- .../apache/flink/graph/gsa/GatherFunction.java | 2 +- .../graph/gsa/GatherSumApplyIteration.java | 25 +-- .../org/apache/flink/graph/gsa/Neighbor.java | 5 +- .../org/apache/flink/graph/gsa/SumFunction.java | 2 +- .../flink/graph/library/LabelPropagation.java | 10 +- .../apache/flink/graph/library/PageRank.java | 11 +- .../library/SingleSourceShortestPaths.java | 14 +- .../flink/graph/spargel/MessagingFunction.java | 6 +- .../graph/spargel/VertexCentricIteration.java | 13 +- .../graph/spargel/VertexUpdateFunction.java | 3 +- .../flink/graph/utils/EdgeToTuple3Map.java | 5 +- .../flink/graph/utils/Tuple2ToVertexMap.java | 5 +- .../flink/graph/utils/Tuple3ToEdgeMap.java | 5 +- .../flink/graph/utils/VertexToTuple2Map.java | 5 +- .../flink/graph/validation/GraphValidator.java | 3 +- .../validation/InvalidVertexIdsValidator.java | 11 +- .../flink/graph/test/WeaklyConnectedITCase.java | 118 ----------- 29 files changed, 116 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java index 1bef5b1..d84badb 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java @@ -18,8 +18,6 @@ package org.apache.flink.graph; -import java.io.Serializable; - import org.apache.flink.api.java.tuple.Tuple3; /** @@ -30,8 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3; * @param the key type for the sources and target vertices * @param the edge value type */ -public class Edge & Serializable, V extends Serializable> - extends Tuple3{ +public class Edge extends Tuple3{ private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java index aac63db..bf1d6a2 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java @@ -32,8 +32,7 @@ import org.apache.flink.util.Collector; * @param the edge value type * @param the type of the return value */ -public interface EdgesFunction & Serializable, - EV extends Serializable, O> extends Function, Serializable { +public interface EdgesFunction extends Function, Serializable { void iterateEdges(Iterable>> edges, Collector out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java index f4f4320..0b0ab0e 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java @@ -33,8 +33,7 @@ import org.apache.flink.util.Collector; * @param the edge value type * @param the type of the return value */ -public interface EdgesFunctionWithVertexValue & Serializable, - VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { +public interface EdgesFunctionWithVertexValue extends Function, Serializable { void iterateEdges(Vertex v, Iterable> edges, Collector out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 1c0052d..490658c 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -18,7 +18,6 @@ package org.apache.flink.graph; -import java.io.Serializable; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; @@ -30,17 +29,13 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; -import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -77,7 +72,7 @@ import org.apache.flink.types.NullValue; * @param the value type for edges */ @SuppressWarnings("serial") -public class Graph & Serializable, VV extends Serializable, EV extends Serializable> { +public class Graph { private final ExecutionEnvironment context; private final DataSet> vertices; @@ -104,9 +99,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromCollection( - Collection> vertices, Collection> edges, - ExecutionEnvironment context) { + public static Graph fromCollection(Collection> vertices, + Collection> edges, ExecutionEnvironment context) { return fromDataSet(context.fromCollection(vertices), context.fromCollection(edges), context); @@ -121,8 +115,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, EV extends Serializable> Graph fromCollection( - Collection> edges, ExecutionEnvironment context) { + public static Graph fromCollection(Collection> edges, + ExecutionEnvironment context) { return fromDataSet(context.fromCollection(edges), context); } @@ -138,9 +132,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromCollection( - Collection> edges, final MapFunction mapper, - ExecutionEnvironment context) { + public static Graph fromCollection(Collection> edges, + final MapFunction mapper,ExecutionEnvironment context) { return fromDataSet(context.fromCollection(edges), mapper, context); } @@ -153,9 +146,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromDataSet( - DataSet> vertices, DataSet> edges, - ExecutionEnvironment context) { + public static Graph fromDataSet(DataSet> vertices, + DataSet> edges, ExecutionEnvironment context) { return new Graph(vertices, edges, context); } @@ -169,7 +161,7 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, EV extends Serializable> Graph fromDataSet( + public static Graph fromDataSet( DataSet> edges, ExecutionEnvironment context) { DataSet> vertices = edges.flatMap(new EmitSrcAndTarget()).distinct(); @@ -177,8 +169,8 @@ public class Graph & Serializable, VV extends Serializab return new Graph(vertices, edges, context); } - private static final class EmitSrcAndTarget & Serializable, EV extends Serializable> - implements FlatMapFunction, Vertex> { + private static final class EmitSrcAndTarget implements FlatMapFunction< + Edge, Vertex> { public void flatMap(Edge edge, Collector> out) { out.collect(new Vertex(edge.f0, NullValue.getInstance())); @@ -197,8 +189,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromDataSet( - DataSet> edges, final MapFunction mapper, ExecutionEnvironment context) { + public static Graph fromDataSet(DataSet> edges, + final MapFunction mapper, ExecutionEnvironment context) { TypeInformation keyType = ((TupleTypeInfo) edges.getType()).getTypeAt(0); @@ -220,8 +212,8 @@ public class Graph & Serializable, VV extends Serializab return new Graph(vertices, edges, context); } - private static final class EmitSrcAndTargetAsTuple1 & Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple1> { + private static final class EmitSrcAndTargetAsTuple1 implements FlatMapFunction< + Edge, Tuple1> { public void flatMap(Edge edge, Collector> out) { out.collect(new Tuple1(edge.f0)); @@ -240,8 +232,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromTupleDataSet( - DataSet> vertices, DataSet> edges, ExecutionEnvironment context) { + public static Graph fromTupleDataSet(DataSet> vertices, + DataSet> edges, ExecutionEnvironment context) { DataSet> vertexDataSet = vertices.map(new Tuple2ToVertexMap()); DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); @@ -259,8 +251,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, EV extends Serializable> Graph fromTupleDataSet( - DataSet> edges, ExecutionEnvironment context) { + public static Graph fromTupleDataSet(DataSet> edges, + ExecutionEnvironment context) { DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); return fromDataSet(edgeDataSet, context); @@ -278,8 +270,8 @@ public class Graph & Serializable, VV extends Serializab * @param context the flink execution environment. * @return the newly created graph. */ - public static & Serializable, VV extends Serializable, EV extends Serializable> Graph fromTupleDataSet( - DataSet> edges, final MapFunction mapper, ExecutionEnvironment context) { + public static Graph fromTupleDataSet(DataSet> edges, + final MapFunction mapper, ExecutionEnvironment context) { DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); return fromDataSet(edgeDataSet, mapper, context); @@ -367,7 +359,7 @@ public class Graph & Serializable, VV extends Serializab * @return a new graph */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public Graph mapVertices(final MapFunction, NV> mapper) { + public Graph mapVertices(final MapFunction, NV> mapper) { TypeInformation keyType = ((TupleTypeInfo) vertices.getType()).getTypeAt(0); @@ -393,7 +385,7 @@ public class Graph & Serializable, VV extends Serializab * @return a new graph */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public Graph mapEdges(final MapFunction, NV> mapper) { + public Graph mapEdges(final MapFunction, NV> mapper) { TypeInformation keyType = ((TupleTypeInfo) edges.getType()).getTypeAt(0); @@ -430,7 +422,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(resultedVertices, this.edges, this.context); } - private static final class ApplyCoGroupToVertexValues & Serializable, VV extends Serializable, T> + private static final class ApplyCoGroupToVertexValues implements CoGroupFunction, Tuple2, Vertex> { private MapFunction, VV> mapper; @@ -479,7 +471,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(this.vertices, resultedEdges, this.context); } - private static final class ApplyCoGroupToEdgeValues & Serializable, EV extends Serializable, T> + private static final class ApplyCoGroupToEdgeValues implements CoGroupFunction, Tuple3, Edge> { private MapFunction, EV> mapper; @@ -530,7 +522,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(this.vertices, resultedEdges, this.context); } - private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget & Serializable, EV extends Serializable, T> + private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget implements CoGroupFunction, Tuple2, Edge> { private MapFunction, EV> mapper; @@ -643,8 +635,8 @@ public class Graph & Serializable, VV extends Serializab } @ForwardedFieldsFirst("0->0;1->1;2->2") - private static final class ProjectEdge & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction, Vertex, Edge> { + private static final class ProjectEdge implements FlatJoinFunction< + Edge, Vertex, Edge> { public void join(Edge first, Vertex second, Collector> out) { out.collect(first); } @@ -660,7 +652,7 @@ public class Graph & Serializable, VV extends Serializab return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup()); } - private static final class CountNeighborsCoGroup & Serializable, VV extends Serializable, EV extends Serializable> + private static final class CountNeighborsCoGroup implements CoGroupFunction, Edge, Tuple2> { @SuppressWarnings("unused") public void coGroup(Iterable> vertex, Iterable> outEdges, @@ -772,8 +764,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectVertexIdMap & Serializable, EV extends Serializable> - implements MapFunction, Tuple2>> { + private static final class ProjectVertexIdMap implements MapFunction< + Edge, Tuple2>> { private int fieldPosition; @@ -787,8 +779,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectVertexWithEdgeValueMap & Serializable, EV extends Serializable> - implements MapFunction, Tuple2> { + private static final class ProjectVertexWithEdgeValueMap implements MapFunction< + Edge, Tuple2> { private int fieldPosition; @@ -802,8 +794,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyGroupReduceFunction & Serializable, EV extends Serializable, T> - implements GroupReduceFunction>, T>, ResultTypeQueryable { + private static final class ApplyGroupReduceFunction implements GroupReduceFunction< + Tuple2>, T>, ResultTypeQueryable { private EdgesFunction function; @@ -821,32 +813,35 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class EmitOneEdgePerNode & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple2>> { + private static final class EmitOneEdgePerNode implements FlatMapFunction< + Edge, Tuple2>> { + public void flatMap(Edge edge, Collector>> out) { out.collect(new Tuple2>(edge.getSource(), edge)); out.collect(new Tuple2>(edge.getTarget(), edge)); } } - private static final class EmitOneVertexWithEdgeValuePerNode & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple2> { + private static final class EmitOneVertexWithEdgeValuePerNode implements FlatMapFunction< + Edge, Tuple2> { + public void flatMap(Edge edge, Collector> out) { out.collect(new Tuple2(edge.getSource(), edge.getValue())); out.collect(new Tuple2(edge.getTarget(), edge.getValue())); } } - private static final class EmitOneEdgeWithNeighborPerNode & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple3>> { + private static final class EmitOneEdgeWithNeighborPerNode implements FlatMapFunction< + Edge, Tuple3>> { + public void flatMap(Edge edge, Collector>> out) { out.collect(new Tuple3>(edge.getSource(), edge.getTarget(), edge)); out.collect(new Tuple3>(edge.getTarget(), edge.getSource(), edge)); } } - private static final class ApplyCoGroupFunction & Serializable, VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction, Edge, T>, ResultTypeQueryable { + private static final class ApplyCoGroupFunction implements CoGroupFunction< + Vertex, Edge, T>, ResultTypeQueryable { private EdgesFunctionWithVertexValue function; @@ -866,7 +861,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyCoGroupFunctionOnAllEdges & Serializable, VV extends Serializable, EV extends Serializable, T> + private static final class ApplyCoGroupFunctionOnAllEdges implements CoGroupFunction, Tuple2>, T>, ResultTypeQueryable { private EdgesFunctionWithVertexValue function; @@ -915,7 +910,7 @@ public class Graph & Serializable, VV extends Serializab } @ForwardedFields("0->1;1->0;2->2") - private static final class ReverseEdgesMap & Serializable, EV extends Serializable> + private static final class ReverseEdgesMap implements MapFunction, Edge> { public Edge map(Edge value) { @@ -955,7 +950,7 @@ public class Graph & Serializable, VV extends Serializab return vertices.map(new ExtractVertexIDMapper()); } - private static final class ExtractVertexIDMapper & Serializable, VV extends Serializable> + private static final class ExtractVertexIDMapper implements MapFunction, K> { @Override public K map(Vertex vertex) { @@ -970,7 +965,7 @@ public class Graph & Serializable, VV extends Serializab return edges.map(new ExtractEdgeIDsMapper()); } - private static final class ExtractEdgeIDsMapper & Serializable, EV extends Serializable> + private static final class ExtractEdgeIDsMapper implements MapFunction, Tuple2> { @Override public Tuple2 map(Edge edge) throws Exception { @@ -979,67 +974,6 @@ public class Graph & Serializable, VV extends Serializab } /** - * Checks the weak connectivity of a graph. - * - * @param maxIterations - * the maximum number of iterations for the inner delta iteration - * @return true if the graph is weakly connected. - */ - public boolean isWeaklyConnected(int maxIterations) throws Exception { - // first, convert to an undirected graph - Graph graph = this.getUndirected(); - - DataSet vertexIds = graph.getVertexIds(); - DataSet> verticesWithInitialIds = vertexIds - .map(new DuplicateVertexIDMapper()); - - DataSet> edgeIds = graph.getEdgeIds(); - - DeltaIteration, Tuple2> iteration = verticesWithInitialIds - .iterateDelta(verticesWithInitialIds, maxIterations, 0); - - DataSet> changes = iteration.getWorkset() - .join(edgeIds, JoinHint.REPARTITION_SORT_MERGE) - .where(0).equalTo(0).with(new FindNeighborsJoin()) - .groupBy(0).aggregate(Aggregations.MIN, 1) - .join(iteration.getSolutionSet(), JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0) - .with(new VertexWithNewComponentJoin()); - - DataSet> components = iteration.closeWith(changes, changes); - return components.groupBy(1).reduceGroup(new EmitFirstReducer()).count() == 1; - } - - private static final class DuplicateVertexIDMapper implements MapFunction> { - @Override - public Tuple2 map(K k) { - return new Tuple2(k, k); - } - } - - private static final class FindNeighborsJoin implements JoinFunction, Tuple2, Tuple2> { - @Override - public Tuple2 join(Tuple2 vertexWithComponent, Tuple2 edge) { - return new Tuple2(edge.f1, vertexWithComponent.f1); - } - } - - private static final class VertexWithNewComponentJoin> - implements FlatJoinFunction, Tuple2, Tuple2> { - @Override - public void join(Tuple2 candidate, Tuple2 old, Collector> out) { - if (candidate.f1.compareTo(old.f1) < 0) { - out.collect(candidate); - } - } - } - - private static final class EmitFirstReducer implements GroupReduceFunction, Tuple2> { - public void reduce(Iterable> values, Collector> out) { - out.collect(values.iterator().next()); - } - } - - /** * Adds the input vertex and edges to the graph. If the vertex already * exists in the graph, it will not be added again, but the given edges * will. @@ -1098,7 +1032,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(newVertices, newEdges, this.context); } - private static final class RemoveVertexFilter & Serializable, VV extends Serializable> + private static final class RemoveVertexFilter implements FilterFunction> { private Vertex vertexToRemove; @@ -1113,7 +1047,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class VertexRemovalEdgeFilter & Serializable, VV extends Serializable, EV extends Serializable> + private static final class VertexRemovalEdgeFilter implements FilterFunction> { private Vertex vertexToRemove; @@ -1147,7 +1081,7 @@ public class Graph & Serializable, VV extends Serializab return new Graph(this.vertices, newEdges, this.context); } - private static final class EdgeRemovalEdgeFilter & Serializable, EV extends Serializable> + private static final class EdgeRemovalEdgeFilter implements FilterFunction> { private Edge edgeToRemove; @@ -1336,7 +1270,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyNeighborGroupReduceFunction & Serializable, VV extends Serializable, EV extends Serializable, T> + private static final class ApplyNeighborGroupReduceFunction implements GroupReduceFunction, Vertex>, T>, ResultTypeQueryable { private NeighborsFunction function; @@ -1355,7 +1289,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectVertexWithNeighborValueJoin & Serializable, VV extends Serializable, EV extends Serializable> + private static final class ProjectVertexWithNeighborValueJoin implements FlatJoinFunction, Vertex, Tuple2> { private int fieldPosition; @@ -1371,8 +1305,9 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectVertexIdJoin & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction, Vertex, Tuple3, Vertex>> { + private static final class ProjectVertexIdJoin implements FlatJoinFunction< + Edge, Vertex, Tuple3, Vertex>> { + private int fieldPosition; public ProjectVertexIdJoin(int position) { this.fieldPosition = position; @@ -1384,8 +1319,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectNeighborValue & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction>, Vertex, Tuple2> { + private static final class ProjectNeighborValue implements FlatJoinFunction< + Tuple3>, Vertex, Tuple2> { public void join(Tuple3> keysWithEdge, Vertex neighbor, Collector> out) { @@ -1394,8 +1329,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ProjectEdgeWithNeighbor & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction>, Vertex, Tuple3, Vertex>> { + private static final class ProjectEdgeWithNeighbor implements FlatJoinFunction< + Tuple3>, Vertex, Tuple3, Vertex>> { public void join(Tuple3> keysWithEdge, Vertex neighbor, Collector, Vertex>> out) { @@ -1403,8 +1338,8 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyNeighborCoGroupFunction & Serializable, VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction, Tuple2, Vertex>, T>, ResultTypeQueryable { + private static final class ApplyNeighborCoGroupFunction implements CoGroupFunction< + Vertex, Tuple2, Vertex>, T>, ResultTypeQueryable { private NeighborsFunctionWithVertexValue function; @@ -1423,7 +1358,7 @@ public class Graph & Serializable, VV extends Serializab } } - private static final class ApplyCoGroupFunctionOnAllNeighbors & Serializable, VV extends Serializable, EV extends Serializable, T> + private static final class ApplyCoGroupFunctionOnAllNeighbors implements CoGroupFunction, Tuple3, Vertex>, T>, ResultTypeQueryable { private NeighborsFunctionWithVertexValue function; @@ -1513,8 +1448,7 @@ public class Graph & Serializable, VV extends Serializab } @ForwardedFields("f0") - private static final class ApplyNeighborReduceFunction & Serializable, VV extends Serializable> - implements ReduceFunction> { + private static final class ApplyNeighborReduceFunction implements ReduceFunction> { private ReduceNeighborsFunction function; @@ -1561,8 +1495,7 @@ public class Graph & Serializable, VV extends Serializab } @ForwardedFields("f0") - private static final class ApplyReduceFunction & Serializable, EV extends Serializable> - implements ReduceFunction> { + private static final class ApplyReduceFunction implements ReduceFunction> { private ReduceEdgesFunction function; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java index ceeeaf4..04181d5 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java @@ -18,14 +18,12 @@ package org.apache.flink.graph; -import java.io.Serializable; - /** * @param key type * @param vertex value type * @param edge value type */ -public interface GraphAlgorithm & Serializable, VV extends Serializable, EV extends Serializable> { +public interface GraphAlgorithm { public Graph run(Graph input) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java index b43f9d1..a21b23d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java @@ -34,8 +34,7 @@ import org.apache.flink.util.Collector; * @param the edge value type * @param the type of the return value */ -public interface NeighborsFunction & Serializable, VV extends Serializable, - EV extends Serializable, O> extends Function, Serializable { +public interface NeighborsFunction extends Function, Serializable { void iterateNeighbors(Iterable, Vertex>> neighbors, Collector out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java index 32d184d..fdf54fa 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java @@ -34,8 +34,7 @@ import org.apache.flink.util.Collector; * @param the edge value type * @param the type of the return value */ -public interface NeighborsFunctionWithVertexValue & Serializable, VV extends Serializable, - EV extends Serializable, O> extends Function, Serializable { +public interface NeighborsFunctionWithVertexValue extends Function, Serializable { void iterateNeighbors(Vertex vertex, Iterable, Vertex>> neighbors, Collector out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java index 707efbf..84eec51 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java @@ -28,7 +28,7 @@ import java.io.Serializable; * * @param the edge value type */ -public interface ReduceEdgesFunction extends Function, Serializable { +public interface ReduceEdgesFunction extends Function, Serializable { EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue); } http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java index 4b5a930..fc5295d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java @@ -29,7 +29,7 @@ import java.io.Serializable; * * @param the vertex value type */ -public interface ReduceNeighborsFunction extends Function, Serializable { +public interface ReduceNeighborsFunction extends Function, Serializable { VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue); } http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java index b85987d..dee3480 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java @@ -20,8 +20,6 @@ package org.apache.flink.graph; import org.apache.flink.api.java.tuple.Tuple5; -import java.io.Serializable; - /** * A Triplet stores and retrieves the edges along with their corresponding source and target vertices. * Triplets can be obtained from the input graph via the {@link org.apache.flink.graph.Graph#getTriplets()} method. @@ -30,8 +28,7 @@ import java.io.Serializable; * @param the vertex value type * @param the edge value type */ -public class Triplet & Serializable, VV extends Serializable, EV extends Serializable> - extends Tuple5 { +public class Triplet extends Tuple5 { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java index 2f71843..c5eb973 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java @@ -18,8 +18,6 @@ package org.apache.flink.graph; -import java.io.Serializable; - import org.apache.flink.api.java.tuple.Tuple2; /** @@ -29,8 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2; * @param * @param */ -public class Vertex & Serializable, V extends Serializable> - extends Tuple2 { +public class Vertex extends Tuple2 { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java index a4963e0..7d24253 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java @@ -25,8 +25,7 @@ import org.apache.flink.util.Collector; import java.io.Serializable; @SuppressWarnings("serial") -public abstract class ApplyFunction & Serializable, VV extends Serializable, M> - implements Serializable { +public abstract class ApplyFunction implements Serializable { public abstract void apply(M newValue, VV currentValue); http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java index 4ffae8d..1c4b2c4 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext; import java.io.Serializable; @SuppressWarnings("serial") -public abstract class GatherFunction implements Serializable { +public abstract class GatherFunction implements Serializable { public abstract M gather(Neighbor neighbor); http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java index 22be591..1de3839 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java @@ -39,8 +39,6 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; -import java.io.Serializable; - /** * This class represents iterative graph computations, programmed in a gather-sum-apply perspective. * @@ -49,8 +47,7 @@ import java.io.Serializable; * @param The type of the edge value in the graph * @param The intermediate type used by the gather, sum and apply functions */ -public class GatherSumApplyIteration & Serializable, - VV extends Serializable, EV extends Serializable, M> implements CustomUnaryOperation, +public class GatherSumApplyIteration implements CustomUnaryOperation, Vertex> { private DataSet> vertexDataSet; @@ -159,10 +156,10 @@ public class GatherSumApplyIteration & Serializable, * * @return An in stance of the gather-sum-apply graph computation operator. */ - public static final & Serializable, VV extends Serializable, EV extends Serializable, M> - GatherSumApplyIteration withEdges(DataSet> edges, - GatherFunction gather, SumFunction sum, ApplyFunction apply, - int maximumNumberOfIterations) { + public static final GatherSumApplyIteration + withEdges(DataSet> edges, GatherFunction gather, + SumFunction sum, ApplyFunction apply, int maximumNumberOfIterations) { + return new GatherSumApplyIteration(gather, sum, apply, edges, maximumNumberOfIterations); } @@ -172,8 +169,7 @@ public class GatherSumApplyIteration & Serializable, @SuppressWarnings("serial") @ForwardedFields("f0") - private static final class GatherUdf & Serializable, VV extends Serializable, - EV extends Serializable, M> extends RichMapFunction>, + private static final class GatherUdf extends RichMapFunction>, Tuple2> implements ResultTypeQueryable> { private final GatherFunction gatherFunction; @@ -210,8 +206,7 @@ public class GatherSumApplyIteration & Serializable, } @SuppressWarnings("serial") - private static final class SumUdf & Serializable, VV extends Serializable, - EV extends Serializable, M> extends RichReduceFunction> + private static final class SumUdf extends RichReduceFunction> implements ResultTypeQueryable>{ private final SumFunction sumFunction; @@ -249,8 +244,7 @@ public class GatherSumApplyIteration & Serializable, } @SuppressWarnings("serial") - private static final class ApplyUdf & Serializable, - VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction, + private static final class ApplyUdf extends RichFlatJoinFunction, Vertex, Vertex> implements ResultTypeQueryable> { private final ApplyFunction applyFunction; @@ -289,8 +283,7 @@ public class GatherSumApplyIteration & Serializable, @SuppressWarnings("serial") @ForwardedFieldsSecond("f1->f0") - private static final class ProjectKeyWithNeighbor & Serializable, - VV extends Serializable, EV extends Serializable> implements FlatJoinFunction< + private static final class ProjectKeyWithNeighbor implements FlatJoinFunction< Vertex, Edge, Tuple2>> { public void join(Vertex vertex, Edge edge, Collector>> out) { http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java index 5a06af9..7fa1ed2 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java @@ -20,8 +20,6 @@ package org.apache.flink.graph.gsa; import org.apache.flink.api.java.tuple.Tuple2; -import java.io.Serializable; - /** * This class represents a pair * This is a wrapper around Tuple2 for convenience in the GatherFunction @@ -29,8 +27,7 @@ import java.io.Serializable; * @param the edge value type */ @SuppressWarnings("serial") -public class Neighbor - extends Tuple2 { +public class Neighbor extends Tuple2 { public Neighbor() {} http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java index 4836af6..0a5e4ae 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext; import java.io.Serializable; @SuppressWarnings("serial") -public abstract class SumFunction implements Serializable { +public abstract class SumFunction implements Serializable { public abstract M sum(M arg0, M arg1); http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index ff6fe85..d63a4c3 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -25,7 +25,6 @@ import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.NullValue; -import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -41,8 +40,7 @@ import java.util.Map.Entry; * */ @SuppressWarnings("serial") -public class LabelPropagation & Serializable> - implements GraphAlgorithm { +public class LabelPropagation implements GraphAlgorithm { private final int maxIterations; @@ -63,8 +61,7 @@ public class LabelPropagation & Serializable> * Function that updates the value of a vertex by adopting the most frequent * label among its in-neighbors */ - public static final class UpdateVertexLabel & Serializable> - extends VertexUpdateFunction { + public static final class UpdateVertexLabel extends VertexUpdateFunction { public void updateVertex(K vertexKey, Long vertexValue, MessageIterator inMessages) { @@ -105,8 +102,7 @@ public class LabelPropagation & Serializable> /** * Sends the vertex label to all out-neighbors */ - public static final class SendNewLabelToNeighbors & Serializable> - extends MessagingFunction { + public static final class SendNewLabelToNeighbors extends MessagingFunction { public void sendMessages(K vertexKey, Long newLabel) { sendMessageToAllNeighbors(newLabel); http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index 48c9a51..bb0a1d1 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -18,8 +18,6 @@ package org.apache.flink.graph.library; -import java.io.Serializable; - import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; @@ -27,8 +25,7 @@ import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; -public class PageRank & Serializable> implements - GraphAlgorithm { +public class PageRank implements GraphAlgorithm { private double beta; private int maxIterations; @@ -51,8 +48,7 @@ public class PageRank & Serializable> implements * ranks from all incoming messages and then applying the dampening formula. */ @SuppressWarnings("serial") - public static final class VertexRankUpdater & Serializable> - extends VertexUpdateFunction { + public static final class VertexRankUpdater extends VertexUpdateFunction { private final double beta; private final long numVertices; @@ -82,8 +78,7 @@ public class PageRank & Serializable> implements * value. */ @SuppressWarnings("serial") - public static final class RankMessenger & Serializable> - extends MessagingFunction { + public static final class RankMessenger extends MessagingFunction { private final long numVertices; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 262b2c5..f4f8b27 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -27,11 +27,8 @@ import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; -import java.io.Serializable; - @SuppressWarnings("serial") -public class SingleSourceShortestPaths & Serializable> - implements GraphAlgorithm { +public class SingleSourceShortestPaths implements GraphAlgorithm { private final K srcVertexId; private final Integer maxIterations; @@ -49,8 +46,7 @@ public class SingleSourceShortestPaths & Serializable> maxIterations); } - public static final class InitVerticesMapper & Serializable> - implements MapFunction, Double> { + public static final class InitVerticesMapper implements MapFunction, Double> { private K srcVertexId; @@ -73,8 +69,7 @@ public class SingleSourceShortestPaths & Serializable> * * @param */ - public static final class VertexDistanceUpdater & Serializable> - extends VertexUpdateFunction { + public static final class VertexDistanceUpdater extends VertexUpdateFunction { @Override public void updateVertex(K vertexKey, Double vertexValue, @@ -100,8 +95,7 @@ public class SingleSourceShortestPaths & Serializable> * * @param */ - public static final class MinDistanceMessenger & Serializable> - extends MessagingFunction { + public static final class MinDistanceMessenger extends MessagingFunction { @Override public void sendMessages(K vertexKey, Double newDistance) http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java index e8a297f..b7e74e3 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java @@ -38,8 +38,7 @@ import org.apache.flink.util.Collector; * @param The type of the message sent between vertices along the edges. * @param The type of the values that are associated with the edges. */ -public abstract class MessagingFunction & Serializable, - VertexValue extends Serializable, Message, EdgeValue extends Serializable> implements Serializable { +public abstract class MessagingFunction implements Serializable { private static final long serialVersionUID = 1L; @@ -198,8 +197,7 @@ public abstract class MessagingFunction this.edgesUsed = false; } - private static final class EdgesIterator & Serializable, - EdgeValue extends Serializable> + private static final class EdgesIterator implements Iterator>, Iterable> { private Iterator> input; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java index ca66521..5ad1420 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java @@ -18,7 +18,6 @@ package org.apache.flink.graph.spargel; -import java.io.Serializable; import java.util.Iterator; import java.util.Map; @@ -69,8 +68,7 @@ import org.apache.flink.util.Collector; * @param The type of the message sent between vertices along the edges. * @param The type of the values that are associated with the edges. */ -public class VertexCentricIteration & Serializable, VertexValue extends Serializable, - Message, EdgeValue extends Serializable> +public class VertexCentricIteration implements CustomUnaryOperation, Vertex> { private final VertexUpdateFunction updateFunction; @@ -218,8 +216,7 @@ public class VertexCentricIteration & Se * * @return An in stance of the vertex-centric graph computation operator. */ - public static final & Serializable, VertexValue extends Serializable, - Message, EdgeValue extends Serializable> + public static final VertexCentricIteration withEdges( DataSet> edgesWithValue, VertexUpdateFunction uf, @@ -233,8 +230,7 @@ public class VertexCentricIteration & Se // Wrapping UDFs // -------------------------------------------------------------------------------------------- - private static final class VertexUpdateUdf & Serializable, - VertexValue extends Serializable, Message> + private static final class VertexUpdateUdf extends RichCoGroupFunction, Vertex, Vertex> implements ResultTypeQueryable> { @@ -308,8 +304,7 @@ public class VertexCentricIteration & Se /* * UDF that encapsulates the message sending function for graphs where the edges have an associated value. */ - private static final class MessagingUdfWithEdgeValues & Serializable, - VertexValue extends Serializable, Message, EdgeValue extends Serializable> + private static final class MessagingUdfWithEdgeValues extends RichCoGroupFunction, Vertex, Tuple2> implements ResultTypeQueryable> { http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java index 5a7cd5c..561c87a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java @@ -36,8 +36,7 @@ import org.apache.flink.util.Collector; * The vertex value type. * The message type. */ -public abstract class VertexUpdateFunction & Serializable, - VertexValue extends Serializable, Message> implements Serializable { +public abstract class VertexUpdateFunction implements Serializable { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java index a7b7b62..c83fc9c 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java @@ -18,14 +18,11 @@ package org.apache.flink.graph.utils; -import java.io.Serializable; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; -public class EdgeToTuple3Map & Serializable, - EV extends Serializable> implements MapFunction, Tuple3> { +public class EdgeToTuple3Map implements MapFunction, Tuple3> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java index d58e4ff..f9645dc 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java @@ -18,14 +18,11 @@ package org.apache.flink.graph.utils; -import java.io.Serializable; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Vertex; -public class Tuple2ToVertexMap & Serializable, - VV extends Serializable> implements MapFunction, Vertex> { +public class Tuple2ToVertexMap implements MapFunction, Vertex> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java index 3668dd2..afeff89 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java @@ -18,8 +18,6 @@ package org.apache.flink.graph.utils; -import java.io.Serializable; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; @@ -30,8 +28,7 @@ import org.apache.flink.graph.Edge; * @param * @param */ -public class Tuple3ToEdgeMap & Serializable, - EV extends Serializable> implements MapFunction, Edge> { +public class Tuple3ToEdgeMap implements MapFunction, Edge> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java index 318e1ed..9ce6f33 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java @@ -18,14 +18,11 @@ package org.apache.flink.graph.utils; -import java.io.Serializable; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Vertex; -public class VertexToTuple2Map & Serializable, - VV extends Serializable> implements MapFunction, Tuple2> { +public class VertexToTuple2Map implements MapFunction, Tuple2> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java index 101e82c..75b672c 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java @@ -30,8 +30,7 @@ import org.apache.flink.graph.Graph; * @param the edge value type */ @SuppressWarnings("serial") -public abstract class GraphValidator & Serializable, VV extends Serializable, EV extends Serializable> - implements Serializable { +public abstract class GraphValidator implements Serializable { public abstract boolean validate(Graph graph) throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java index cc06ca7..33d469b 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java @@ -28,11 +28,8 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; -import java.io.Serializable; - @SuppressWarnings("serial") -public class InvalidVertexIdsValidator & Serializable, VV extends Serializable, EV extends Serializable> - extends GraphValidator { +public class InvalidVertexIdsValidator extends GraphValidator { /** * Checks that the edge set input contains valid vertex Ids, i.e. that they @@ -51,16 +48,14 @@ public class InvalidVertexIdsValidator & Serializable, V return invalidIds.map(new KToTupleMap()).count() == 0; } - private static final class MapEdgeIds & Serializable, EV extends Serializable> - implements FlatMapFunction, Tuple1> { + private static final class MapEdgeIds implements FlatMapFunction, Tuple1> { public void flatMap(Edge edge, Collector> out) { out.collect(new Tuple1(edge.f0)); out.collect(new Tuple1(edge.f1)); } } - private static final class GroupInvalidIds & Serializable, VV extends Serializable> - implements CoGroupFunction, Tuple1, K> { + private static final class GroupInvalidIds implements CoGroupFunction, Tuple1, K> { public void coGroup(Iterable> vertexId, Iterable> edgeId, Collector out) { if (!(vertexId.iterator().hasNext())) { http://git-wip-us.apache.org/repos/asf/flink/blob/ce2163e6/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java deleted file mode 100644 index 9db449e..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class WeaklyConnectedITCase extends MultipleProgramsTestBase { - - public WeaklyConnectedITCase(TestExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithConnectedDirected() throws Exception { - /* - * Test isWeaklyConnected() with a connected, directed graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath); - - env.execute(); - expectedResult = "true\n"; - } - - @Test - public void testWithDisconnectedDirected() throws Exception { - /* - * Test isWeaklyConnected() with a disconnected, directed graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - - env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath); - - env.execute(); - expectedResult = "false\n"; - } - - @Test - public void testWithConnectedUndirected() throws Exception { - /* - * Test isWeaklyConnected() with a connected, undirected graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env).getUndirected(); - - env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath); - - env.execute(); - expectedResult = "true\n"; - } - - @Test - public void testWithDisconnectedUndirected() throws Exception { - /* - * Test isWeaklyConnected() with a disconnected, undirected graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected(); - - env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath); - - env.execute(); - expectedResult = "false\n"; - } -}