flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject flink git commit: [FLINK-2905] [gelly] Add Graph Intersection method
Date Mon, 09 Nov 2015 13:23:01 GMT
Repository: flink
Updated Branches:
  refs/heads/master c60754956 -> 3ade4c790


[FLINK-2905] [gelly] Add Graph Intersection method

This closes #1329


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ade4c79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ade4c79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ade4c79

Branch: refs/heads/master
Commit: 3ade4c79016b95ac5e38d7d2a16277a70e095463
Parents: c607549
Author: Martin Junghanns <m.junghanns@mailbox.org>
Authored: Thu Oct 29 14:42:37 2015 +0100
Committer: vasia <vasia@apache.org>
Committed: Mon Nov 9 13:49:28 2015 +0100

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        | 57 +++++++++++-
 .../org/apache/flink/graph/scala/Graph.scala    | 23 +++++
 .../main/java/org/apache/flink/graph/Graph.java | 94 +++++++++++++++++++-
 .../test/operations/GraphOperationsITCase.java  | 90 ++++++++++++++++++-
 4 files changed, 258 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ade4c79/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 59e1a3b..e4684f8 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -485,13 +485,66 @@ val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,
(v1: Do
 
 * <strong>Undirected</strong>: In Gelly, a `Graph` is always directed. Undirected
graphs can be represented by adding all opposite-direction edges to a graph. For this purpose,
Gelly provides the `getUndirected()` method.
 
-* <strong>Union</strong>: Gelly's `union()` method performs a union operation
on the vertex and edge sets of the specified graph and current graph. Duplicate vertices are
removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained.
+* <strong>Union</strong>: Gelly's `union()` method performs a union operation
on the vertex and edge sets of the specified graph and the current graph. Duplicate vertices
are removed from the resulting `Graph`, while if duplicate edges exist, these will be preserved.
 
 <p class="text-center">
     <img alt="Union Transformation" width="50%" src="fig/gelly-union.png"/>
 </p>
 
-* <strong>Difference</strong>: Gelly's `difference()` method performs a difference
on the vertex and edge sets of the current graph and specified graph.
+* <strong>Difference</strong>: Gelly's `difference()` method performs a difference
on the vertex and edge sets of the current graph and the specified graph.
+
+* <strong>Intersect</strong>: Gelly's `intersect()` method performs an intersect
on the edge
+ sets of the current graph and the specified graph. The result is a new `Graph` that contains
all 
+ edges that exist in both input graphs. Two edges are considered equal, if they have the
same source
+ identifier, target identifier and edge value. Vertices in the resulting graph have no 
+ value. If vertex values are required, one can for example retrieve them from one of the
input graphs using 
+ the `joinWithVertices()` method.
+ Depending on the parameter `distinct`, equal edges are either contained once in the resulting

+ `Graph` or as often as there are pairs of equal edges in the input graphs.
+ 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)}
+List<Edge<Long, Long>> edges1 = ...
+Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
+
+// create second graph from edges {(1, 3, 13)}
+List<Edge<Long, Long>> edges2 = ...
+Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
+
+// Using distinct = true results in {(1,3,13)}
+Graph<Long, NullValue, Long> intersect1 = graph1.intersect(graph2, true);
+
+// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair
+Graph<Long, NullValue, Long> intersect2 = graph1.intersect(graph2, false);
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)}
+val edges1: List[Edge[Long, Long]] = ...
+val graph1 = Graph.fromCollection(edges1, env)
+
+// create second graph from edges {(1, 3, 13)}
+val edges2: List[Edge[Long, Long]] = ...
+val graph2 = Graph.fromCollection(edges2, env)
+
+
+// Using distinct = true results in {(1,3,13)}
+val intersect1 = graph1.intersect(graph2, true)
+
+// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair
+val intersect2 = graph1.intersect(graph2, false)
+{% endhighlight %}
+</div>
+</div>
 
 -[Back to top](#top)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3ade4c79/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index e51453e..3a0843a 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -968,6 +968,29 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
+   * Performs intersect on the edge sets of the input graphs. Edges are considered equal,
if they
+   * have the same source identifier, target identifier and edge value.
+   * <p>
+   * The method computes pairs of equal edges from the input graphs. If the same edge occurs
+   * multiple times in the input graphs, there will be multiple edge pairs to be considered.
Each
+   * edge instance can only be part of one pair. If the given parameter {@code distinctEdges}
is set
+   * to {@code true}, there will be exactly one edge in the output graph representing all
pairs of
+   * equal edges. If the parameter is set to {@code false}, both edges of each pair will
be in the
+   * output.
+   * <p>
+   * Vertices in the output graph will have no vertex values.
+   *
+   * @param graph the graph to perform intersect with
+   * @param distinctEdges if set to { @code true}, there will be exactly one edge in the
output
+   *                      graph representing all pairs of equal edges, otherwise, for each
pair,
+   *                      both edges will be in the output graph
+   * @return a new graph which contains only common vertices and edges from the input graphs
+   */
+  def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]
= {
+    wrapGraph(jgraph.intersect(graph.getWrappedGraph, distinctEdges))
+  }
+
+  /**
    * Compute a reduce transformation over the neighbors' vertex values of each vertex.
    * For each vertex, the transformation consecutively calls a
    * {@link ReduceNeighborsFunction} until only a single value for each vertex remains.

http://git-wip-us.apache.org/repos/asf/flink/blob/3ade4c79/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 4f603f7..cf16fd3 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -60,8 +60,8 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.apache.flink.graph.validation.GraphValidator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
 
 /**
  * Represents a Graph consisting of {@link Edge edges} and {@link Vertex
@@ -174,7 +174,7 @@ public class Graph<K, VV, EV> {
 	}
 
 	private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
-		Edge<K, EV>, Vertex<K, NullValue>> {
+			Edge<K, EV>, Vertex<K, NullValue>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>>
out) {
 			out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
@@ -1477,7 +1477,6 @@ public class Graph<K, VV, EV> {
 	 * @return a new graph
 	 */
 	public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
-
 		DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
 		DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
 		return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context);
@@ -1497,6 +1496,95 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
+	 * Performs intersect on the edge sets of the input graphs. Edges are considered equal,
if they
+	 * have the same source identifier, target identifier and edge value.
+	 * <p>
+	 * The method computes pairs of equal edges from the input graphs. If the same edge occurs
+	 * multiple times in the input graphs, there will be multiple edge pairs to be considered.
Each
+	 * edge instance can only be part of one pair. If the given parameter {@code distinctEdges}
is set
+	 * to {@code true}, there will be exactly one edge in the output graph representing all
pairs of
+	 * equal edges. If the parameter is set to {@code false}, both edges of each pair will be
in the
+	 * output.
+	 * <p>
+	 * Vertices in the output graph will have no vertex values.
+	 *
+	 * @param graph the graph to perform intersect with
+	 * @param distinctEdges if set to {@code true}, there will be exactly one edge in the output
graph
+	 *                      representing all pairs of equal edges, otherwise, for each pair,
both
+	 *                      edges will be in the output graph
+	 * @return a new graph which contains only common vertices and edges from the input graphs
+	 */
+	public Graph<K, NullValue, EV> intersect(Graph<K, VV, EV> graph, boolean distinctEdges)
{
+		DataSet<Edge<K, EV>> intersectEdges;
+		if (distinctEdges) {
+			intersectEdges = getDistinctEdgeIntersection(graph.getEdges());
+		} else {
+			intersectEdges = getPairwiseEdgeIntersection(graph.getEdges());
+		}
+
+		return Graph.fromDataSet(intersectEdges, getContext());
+	}
+
+	/**
+	 * Computes the intersection between the edge set and the given edge set. For all matching
pairs,
+	 * only one edge will be in the resulting data set.
+	 *
+	 * @param edges edges to compute intersection with
+	 * @return edge set containing one edge for all matching pairs of the same edge
+	 */
+	private DataSet<Edge<K, EV>> getDistinctEdgeIntersection(DataSet<Edge<K,
EV>> edges) {
+		return this.getEdges()
+				.join(edges)
+				.where(0, 1, 2)
+				.equalTo(0, 1, 2)
+				.with(new JoinFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>>()
{
+					@Override
+					public Edge<K, EV> join(Edge<K, EV> first, Edge<K, EV> second) throws
Exception {
+						return first;
+					}
+				}).withForwardedFieldsFirst("*")
+				.distinct();
+	}
+
+	/**
+	 * Computes the intersection between the edge set and the given edge set. For all matching
pairs, both edges will be
+	 * in the resulting data set.
+	 *
+	 * @param edges edges to compute intersection with
+	 * @return edge set containing both edges from all matching pairs of the same edge
+	 */
+	private DataSet<Edge<K, EV>> getPairwiseEdgeIntersection(DataSet<Edge<K,
EV>> edges) {
+		return this.getEdges()
+				.coGroup(edges)
+				.where(0, 1, 2)
+				.equalTo(0, 1, 2)
+				.with(new MatchingEdgeReducer<K, EV>());
+	}
+
+	/**
+	 * As long as both input iterables have more edges, the reducer outputs each edge of a pair.
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <EV> 	edge value type
+	 */
+	private static final class MatchingEdgeReducer<K, EV>
+			implements CoGroupFunction<Edge<K,EV>, Edge<K,EV>, Edge<K, EV>>
{
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> edgesLeft, Iterable<Edge<K,
EV>> edgesRight, Collector<Edge<K, EV>> out)
+				throws Exception {
+			Iterator<Edge<K, EV>> leftIt = edgesLeft.iterator();
+			Iterator<Edge<K, EV>> rightIt = edgesRight.iterator();
+
+			// collect pairs once
+			while(leftIt.hasNext() && rightIt.hasNext()) {
+				out.collect(leftIt.next());
+				out.collect(rightIt.next());
+			}
+		}
+	}
+
+	/**
 	 * Runs a Vertex-Centric iteration on the graph.
 	 * No configuration options are provided.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/3ade4c79/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index ffc9da9..18cf4c4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -21,9 +21,12 @@ package org.apache.flink.graph.test.operations;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.Lists;
+
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -31,6 +34,7 @@ import org.apache.flink.graph.Triplet;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -302,7 +306,6 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 		compareResultAsTuples(result, expectedResult);
 	}
 
-
 	@Test
 	public void testDifferenceVertices() throws Exception{
 		/*Test  difference() method  by checking    the output  for getVertices()   on  the resultant
  graph
@@ -356,6 +359,91 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public final void testIntersect() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		@SuppressWarnings("unchecked")
+		List<Edge<Long, Long>> edges1 = Lists.newArrayList(
+				new Edge<>(1L, 3L, 12L),
+				new Edge<>(1L, 3L, 13L), // needs to be in the output
+				new Edge<>(1L, 3L, 14L)
+		);
+
+		@SuppressWarnings("unchecked")
+		List<Edge<Long, Long>> edges2 = Lists.newArrayList(
+				new Edge<>(1L, 3L, 13L)
+		);
+
+		Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
+		Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
+
+		Graph<Long, NullValue, Long> intersect = graph1.intersect(graph2, true);
+
+		List<Vertex<Long, NullValue>> vertices = Lists.newArrayList();
+		List<Edge<Long, Long>> edges = Lists.newArrayList();
+
+		intersect.getVertices().output(new LocalCollectionOutputFormat<>(vertices));
+		intersect.getEdges().output(new LocalCollectionOutputFormat<>(edges));
+
+		env.execute();
+
+		String expectedVertices = "1,(null)\n" +
+				"3,(null)\n";
+
+		String expectedEdges = "1,3,13\n";
+
+		compareResultAsTuples(vertices, expectedVertices);
+		compareResultAsTuples(edges, expectedEdges);
+	}
+
+	@Test
+	public final void testIntersectWithPairs() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		@SuppressWarnings("unchecked")
+		List<Edge<Long, Long>> edges1 = Lists.newArrayList(
+				new Edge<>(1L, 3L, 12L),
+				new Edge<>(1L, 3L, 13L),
+				new Edge<>(1L, 3L, 13L), // output
+				new Edge<>(1L, 3L, 13L), // output
+				new Edge<>(1L, 3L, 14L)  // output
+		);
+
+		@SuppressWarnings("unchecked")
+		List<Edge<Long, Long>> edges2 = Lists.newArrayList(
+				new Edge<>(1L, 3L, 13L), // output
+				new Edge<>(1L, 3L, 13L), // output
+				new Edge<>(1L, 3L, 14L)  // output
+		);
+
+		Graph<Long, NullValue, Long> graph1 = Graph.fromCollection(edges1, env);
+		Graph<Long, NullValue, Long> graph2 = Graph.fromCollection(edges2, env);
+
+		Graph<Long, NullValue, Long> intersect = graph1.intersect(graph2, false);
+
+		List<Vertex<Long, NullValue>> vertices = Lists.newArrayList();
+		List<Edge<Long, Long>> edges = Lists.newArrayList();
+
+		intersect.getVertices().output(new LocalCollectionOutputFormat<>(vertices));
+		intersect.getEdges().output(new LocalCollectionOutputFormat<>(edges));
+
+		env.execute();
+
+		String expectedVertices = "1,(null)\n" +
+				"3,(null)\n";
+
+		String expectedEdges = "1,3,13\n" +
+				"1,3,13\n" +
+				"1,3,13\n" +
+				"1,3,13\n" +
+				"1,3,14\n" +
+				"1,3,14";
+
+		compareResultAsTuples(vertices, expectedVertices);
+		compareResultAsTuples(edges, expectedEdges);
+	}
+
+	@Test
 	public void testTriplets() throws Exception {
 		/*
 		 * Test getTriplets()


Mime
View raw message