Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1DCF418400 for ; Mon, 9 Nov 2015 13:23:02 +0000 (UTC) Received: (qmail 45766 invoked by uid 500); 9 Nov 2015 13:23:02 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 45731 invoked by uid 500); 9 Nov 2015 13:23:02 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 45722 invoked by uid 99); 9 Nov 2015 13:23:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 13:23:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CF8F8E0498; Mon, 9 Nov 2015 13:23:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Message-Id: <9e0e8064f1e24048a996beb572ae5af7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-2905] [gelly] Add Graph Intersection method Date: Mon, 9 Nov 2015 13:23:01 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master c60754956 -> 3ade4c790 [FLINK-2905] [gelly] Add Graph Intersection method This closes #1329 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ade4c79 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ade4c79 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ade4c79 Branch: refs/heads/master Commit: 3ade4c79016b95ac5e38d7d2a16277a70e095463 Parents: c607549 Author: Martin Junghanns Authored: Thu Oct 29 14:42:37 2015 +0100 Committer: vasia Committed: Mon Nov 9 13:49:28 2015 +0100 ---------------------------------------------------------------------- docs/libs/gelly_guide.md | 57 +++++++++++- .../org/apache/flink/graph/scala/Graph.scala | 23 +++++ .../main/java/org/apache/flink/graph/Graph.java | 94 +++++++++++++++++++- .../test/operations/GraphOperationsITCase.java | 90 ++++++++++++++++++- 4 files changed, 258 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3ade4c79/docs/libs/gelly_guide.md ---------------------------------------------------------------------- diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 59e1a3b..e4684f8 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -485,13 +485,66 @@ val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Do * Undirected: In Gelly, a `Graph` is always directed. Undirected graphs can be represented by adding all opposite-direction edges to a graph. For this purpose, Gelly provides the `getUndirected()` method. -* Union: Gelly's `union()` method performs a union operation on the vertex and edge sets of the specified graph and current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained. +* Union: Gelly's `union()` method performs a union operation on the vertex and edge sets of the specified graph and the current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exist, these will be preserved.

Union Transformation

-* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and specified graph. +* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and the specified graph. + +* Intersect: Gelly's `intersect()` method performs an intersect on the edge + sets of the current graph and the specified graph. The result is a new `Graph` that contains all + edges that exist in both input graphs. Two edges are considered equal, if they have the same source + identifier, target identifier and edge value. Vertices in the resulting graph have no + value. If vertex values are required, one can for example retrieve them from one of the input graphs using + the `joinWithVertices()` method. + Depending on the parameter `distinct`, equal edges are either contained once in the resulting + `Graph` or as often as there are pairs of equal edges in the input graphs. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)} +List> edges1 = ... +Graph graph1 = Graph.fromCollection(edges1, env); + +// create second graph from edges {(1, 3, 13)} +List> edges2 = ... +Graph graph2 = Graph.fromCollection(edges2, env); + +// Using distinct = true results in {(1,3,13)} +Graph intersect1 = graph1.intersect(graph2, true); + +// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair +Graph intersect2 = graph1.intersect(graph2, false); + +{% endhighlight %} +
+ +
+{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment + +// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)} +val edges1: List[Edge[Long, Long]] = ... +val graph1 = Graph.fromCollection(edges1, env) + +// create second graph from edges {(1, 3, 13)} +val edges2: List[Edge[Long, Long]] = ... +val graph2 = Graph.fromCollection(edges2, env) + + +// Using distinct = true results in {(1,3,13)} +val intersect1 = graph1.intersect(graph2, true) + +// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair +val intersect2 = graph1.intersect(graph2, false) +{% endhighlight %} +
+
-[Back to top](#top) http://git-wip-us.apache.org/repos/asf/flink/blob/3ade4c79/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index e51453e..3a0843a 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -968,6 +968,29 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they + * have the same source identifier, target identifier and edge value. + *

+ * The method computes pairs of equal edges from the input graphs. If the same edge occurs + * multiple times in the input graphs, there will be multiple edge pairs to be considered. Each + * edge instance can only be part of one pair. If the given parameter {@code distinctEdges} is set + * to {@code true}, there will be exactly one edge in the output graph representing all pairs of + * equal edges. If the parameter is set to {@code false}, both edges of each pair will be in the + * output. + *

+ * Vertices in the output graph will have no vertex values. + * + * @param graph the graph to perform intersect with + * @param distinctEdges if set to { @code true}, there will be exactly one edge in the output + * graph representing all pairs of equal edges, otherwise, for each pair, + * both edges will be in the output graph + * @return a new graph which contains only common vertices and edges from the input graphs + */ + def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV] = { + wrapGraph(jgraph.intersect(graph.getWrappedGraph, distinctEdges)) + } + + /** * Compute a reduce transformation over the neighbors' vertex values of each vertex. * For each vertex, the transformation consecutively calls a * {@link ReduceNeighborsFunction} until only a single value for each vertex remains. http://git-wip-us.apache.org/repos/asf/flink/blob/3ade4c79/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 4f603f7..cf16fd3 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -60,8 +60,8 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap; import org.apache.flink.graph.utils.Tuple3ToEdgeMap; import org.apache.flink.graph.utils.VertexToTuple2Map; import org.apache.flink.graph.validation.GraphValidator; -import org.apache.flink.util.Collector; import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; /** * Represents a Graph consisting of {@link Edge edges} and {@link Vertex @@ -174,7 +174,7 @@ public class Graph { } private static final class EmitSrcAndTarget implements FlatMapFunction< - Edge, Vertex> { + Edge, Vertex> { public void flatMap(Edge edge, Collector> out) { out.collect(new Vertex(edge.f0, NullValue.getInstance())); @@ -1477,7 +1477,6 @@ public class Graph { * @return a new graph */ public Graph union(Graph graph) { - DataSet> unionedVertices = graph.getVertices().union(this.getVertices()).distinct(); DataSet> unionedEdges = graph.getEdges().union(this.getEdges()); return new Graph(unionedVertices, unionedEdges, this.context); @@ -1497,6 +1496,95 @@ public class Graph { } /** + * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they + * have the same source identifier, target identifier and edge value. + *

+ * The method computes pairs of equal edges from the input graphs. If the same edge occurs + * multiple times in the input graphs, there will be multiple edge pairs to be considered. Each + * edge instance can only be part of one pair. If the given parameter {@code distinctEdges} is set + * to {@code true}, there will be exactly one edge in the output graph representing all pairs of + * equal edges. If the parameter is set to {@code false}, both edges of each pair will be in the + * output. + *

+ * Vertices in the output graph will have no vertex values. + * + * @param graph the graph to perform intersect with + * @param distinctEdges if set to {@code true}, there will be exactly one edge in the output graph + * representing all pairs of equal edges, otherwise, for each pair, both + * edges will be in the output graph + * @return a new graph which contains only common vertices and edges from the input graphs + */ + public Graph intersect(Graph graph, boolean distinctEdges) { + DataSet> intersectEdges; + if (distinctEdges) { + intersectEdges = getDistinctEdgeIntersection(graph.getEdges()); + } else { + intersectEdges = getPairwiseEdgeIntersection(graph.getEdges()); + } + + return Graph.fromDataSet(intersectEdges, getContext()); + } + + /** + * Computes the intersection between the edge set and the given edge set. For all matching pairs, + * only one edge will be in the resulting data set. + * + * @param edges edges to compute intersection with + * @return edge set containing one edge for all matching pairs of the same edge + */ + private DataSet> getDistinctEdgeIntersection(DataSet> edges) { + return this.getEdges() + .join(edges) + .where(0, 1, 2) + .equalTo(0, 1, 2) + .with(new JoinFunction, Edge, Edge>() { + @Override + public Edge join(Edge first, Edge second) throws Exception { + return first; + } + }).withForwardedFieldsFirst("*") + .distinct(); + } + + /** + * Computes the intersection between the edge set and the given edge set. For all matching pairs, both edges will be + * in the resulting data set. + * + * @param edges edges to compute intersection with + * @return edge set containing both edges from all matching pairs of the same edge + */ + private DataSet> getPairwiseEdgeIntersection(DataSet> edges) { + return this.getEdges() + .coGroup(edges) + .where(0, 1, 2) + .equalTo(0, 1, 2) + .with(new MatchingEdgeReducer()); + } + + /** + * As long as both input iterables have more edges, the reducer outputs each edge of a pair. + * + * @param vertex identifier type + * @param edge value type + */ + private static final class MatchingEdgeReducer + implements CoGroupFunction, Edge, Edge> { + + @Override + public void coGroup(Iterable> edgesLeft, Iterable> edgesRight, Collector> out) + throws Exception { + Iterator> leftIt = edgesLeft.iterator(); + Iterator> rightIt = edgesRight.iterator(); + + // collect pairs once + while(leftIt.hasNext() && rightIt.hasNext()) { + out.collect(leftIt.next()); + out.collect(rightIt.next()); + } + } + } + + /** * Runs a Vertex-Centric iteration on the graph. * No configuration options are provided. * http://git-wip-us.apache.org/repos/asf/flink/blob/3ade4c79/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java index ffc9da9..18cf4c4 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java @@ -21,9 +21,12 @@ package org.apache.flink.graph.test.operations; import java.util.ArrayList; import java.util.List; +import com.google.common.collect.Lists; + import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; @@ -31,6 +34,7 @@ import org.apache.flink.graph.Triplet; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -302,7 +306,6 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase { compareResultAsTuples(result, expectedResult); } - @Test public void testDifferenceVertices() throws Exception{ /*Test difference() method by checking the output for getVertices() on the resultant graph @@ -356,6 +359,91 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase { } @Test + public final void testIntersect() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + List> edges1 = Lists.newArrayList( + new Edge<>(1L, 3L, 12L), + new Edge<>(1L, 3L, 13L), // needs to be in the output + new Edge<>(1L, 3L, 14L) + ); + + @SuppressWarnings("unchecked") + List> edges2 = Lists.newArrayList( + new Edge<>(1L, 3L, 13L) + ); + + Graph graph1 = Graph.fromCollection(edges1, env); + Graph graph2 = Graph.fromCollection(edges2, env); + + Graph intersect = graph1.intersect(graph2, true); + + List> vertices = Lists.newArrayList(); + List> edges = Lists.newArrayList(); + + intersect.getVertices().output(new LocalCollectionOutputFormat<>(vertices)); + intersect.getEdges().output(new LocalCollectionOutputFormat<>(edges)); + + env.execute(); + + String expectedVertices = "1,(null)\n" + + "3,(null)\n"; + + String expectedEdges = "1,3,13\n"; + + compareResultAsTuples(vertices, expectedVertices); + compareResultAsTuples(edges, expectedEdges); + } + + @Test + public final void testIntersectWithPairs() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + List> edges1 = Lists.newArrayList( + new Edge<>(1L, 3L, 12L), + new Edge<>(1L, 3L, 13L), + new Edge<>(1L, 3L, 13L), // output + new Edge<>(1L, 3L, 13L), // output + new Edge<>(1L, 3L, 14L) // output + ); + + @SuppressWarnings("unchecked") + List> edges2 = Lists.newArrayList( + new Edge<>(1L, 3L, 13L), // output + new Edge<>(1L, 3L, 13L), // output + new Edge<>(1L, 3L, 14L) // output + ); + + Graph graph1 = Graph.fromCollection(edges1, env); + Graph graph2 = Graph.fromCollection(edges2, env); + + Graph intersect = graph1.intersect(graph2, false); + + List> vertices = Lists.newArrayList(); + List> edges = Lists.newArrayList(); + + intersect.getVertices().output(new LocalCollectionOutputFormat<>(vertices)); + intersect.getEdges().output(new LocalCollectionOutputFormat<>(edges)); + + env.execute(); + + String expectedVertices = "1,(null)\n" + + "3,(null)\n"; + + String expectedEdges = "1,3,13\n" + + "1,3,13\n" + + "1,3,13\n" + + "1,3,13\n" + + "1,3,14\n" + + "1,3,14"; + + compareResultAsTuples(vertices, expectedVertices); + compareResultAsTuples(edges, expectedEdges); + } + + @Test public void testTriplets() throws Exception { /* * Test getTriplets()