flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject flink git commit: [FLINK-2411] [gelly] Add Summarization Algorithm
Date Mon, 26 Oct 2015 10:44:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3dc7423c4 -> daa357aca


[FLINK-2411] [gelly] Add Summarization Algorithm

* implemented algorithm
* implemented integration tests
* updated gelly guide

This closes #1269


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daa357ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daa357ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daa357ac

Branch: refs/heads/master
Commit: daa357aca1fe7745eb7809ac384c5fdc14bd74db
Parents: 3dc7423
Author: Martin Junghanns <m.junghanns@mailbox.org>
Authored: Tue Oct 20 09:28:40 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Mon Oct 26 11:12:03 2015 +0100

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  25 +
 .../main/java/org/apache/flink/graph/Graph.java |   6 +-
 .../graph/example/utils/SummarizationData.java  | 134 +++++
 .../flink/graph/library/Summarization.java      | 521 +++++++++++++++++++
 .../graph/test/library/SummarizationITCase.java | 189 +++++++
 5 files changed, 873 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 646ec7f..59e1a3b 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -1437,6 +1437,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing
large-sc
 * [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths)
 * [GSA Triangle Count](#gsa-triangle-count)
 * [Triangle Enumerator](#triangle-enumerator)
+* [Summarization](#summarization)
 
 Gelly's library methods can be used by simply calling the `run()` method on the input graph:
 
@@ -1617,4 +1618,28 @@ This implementation extends the basic algorithm by computing output
degrees of e
 The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3`. The Vertex
ID type has to be `Comparable`.
 Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of the vertices
forming the triangle.
 
+### Summarization
+
+#### Overview
+The summarization algorithm computes a condensed version of the input graph by grouping vertices
and edges based on 
+their values. In doing so, the algorithm helps to uncover insights about patterns and distributions
in the graph.
+One possible use case is the visualization of communities where the whole graph is too large
and needs to be summarized
+based on the community identifier stored at a vertex.
+
+#### Details
+In the resulting graph, each vertex represents a group of vertices that share the same value.
An edge, that connects a 
+vertex with itself, represents all edges with the same edge value that connect vertices from
the same vertex group. An 
+edge between different vertices in the output graph represents all edges with the same edge
value between members of 
+different vertex groups in the input graph.
+
+The algorithm is implemented using Flink data operators. First, vertices are grouped by their
value and a representative
+is chosen from each group. For any edge, the source and target vertex identifiers are replaced
with the corresponding 
+representative and grouped by source, target and edge value. Output vertices and edges are
created from their
+corresponding groupings.
+
+#### Usage
+The algorithm takes a directed, vertex (and possibly edge) attributed graph as input and
outputs a new graph where each
+vertex represents a group of vertices and each edge represents a group of edges from the
input graph. Furthermore, each 
+vertex and edge in the output graph stores the common group value and the number of represented
elements.
+
 [Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 6015be4..68bfb5b 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1370,7 +1370,7 @@ public class Graph<K, VV, EV> {
 		DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
 				.with(new VerticesRemovalCoGroup<K, VV>());
 
-		DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
+		DataSet <Edge< K, EV>> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
 				// if the edge source was removed, the edge will also be removed
 				.with(new ProjectEdgeToBeRemoved<K, VV, EV>())
 				// if the edge target was removed, the edge will also be removed
@@ -1485,7 +1485,9 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Performs Difference on the vertex and edge sets of the input graphs
-	 * removes common vertices and edges. If a source/target vertex is removed, its corresponding
edge will also be removed
+	 * removes common vertices and edges. If a source/target vertex is removed,
+	 * its corresponding edge will also be removed
+	 * 
 	 * @param graph the graph to perform difference with
 	 * @return a new graph where the common vertices and edges have been removed
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
new file mode 100644
index 0000000..88f76cc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+import java.util.List;
+
+/**
+ * Provides the default data set used for Summarization tests.
+ */
+public class SummarizationData {
+
+	private SummarizationData() {}
+
+	/**
+	 * The resulting vertex id can be any id of the vertices summarized by the single vertex.
+	 *
+	 * Format:
+	 *
+	 * "possible-id[,possible-id];group-value,group-count"
+	 */
+	public static final String[] EXPECTED_VERTICES = new String[] {
+			"0,1;A,2",
+			"2,3,4;B,3",
+			"5;C,1"
+	};
+
+	/**
+	 * Format:
+	 *
+	 * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
+	 */
+	public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] {
+			"0,1;0,1;A,2",
+			"0,1;2,3,4;A,1",
+			"2,3,4;0,1;A,1",
+			"2,3,4;0,1;C,2",
+			"2,3,4;2,3,4;B,2",
+			"5;2,3,4;D,2"
+	};
+
+	/**
+	 * Format:
+	 *
+	 * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
+	 */
+	public static final String[] EXPECTED_EDGES_ABSENT_VALUES = new String[] {
+			"0,1;0,1;(null),2",
+			"0,1;2,3,4;(null),1",
+			"2,3,4;0,1;(null),3",
+			"2,3,4;2,3,4;(null),2",
+			"5;2,3,4;(null),2"
+	};
+
+	/**
+	 * Creates a set of vertices with attached {@link String} values.
+	 *
+	 * @param env execution environment
+	 * @return vertex data set with string values
+	 */
+	public static DataSet<Vertex<Long, String>> getVertices(ExecutionEnvironment
env) {
+		List<Vertex<Long, String>> vertices = Lists.newArrayListWithExpectedSize(6);
+		vertices.add(new Vertex<>(0L, "A"));
+		vertices.add(new Vertex<>(1L, "A"));
+		vertices.add(new Vertex<>(2L, "B"));
+		vertices.add(new Vertex<>(3L, "B"));
+		vertices.add(new Vertex<>(4L, "B"));
+		vertices.add(new Vertex<>(5L, "C"));
+
+		return env.fromCollection(vertices);
+	}
+
+	/**
+	 * Creates a set of edges with attached {@link String} values.
+	 *
+	 * @param env execution environment
+	 * @return edge data set with string values
+	 */
+	public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment env)
{
+		List<Edge<Long, String>> edges = Lists.newArrayListWithExpectedSize(10);
+		edges.add(new Edge<>(0L, 1L, "A"));
+		edges.add(new Edge<>(1L, 0L, "A"));
+		edges.add(new Edge<>(1L, 2L, "A"));
+		edges.add(new Edge<>(2L, 1L, "A"));
+		edges.add(new Edge<>(2L, 3L, "B"));
+		edges.add(new Edge<>(3L, 2L, "B"));
+		edges.add(new Edge<>(4L, 0L, "C"));
+		edges.add(new Edge<>(4L, 1L, "C"));
+		edges.add(new Edge<>(5L, 2L, "D"));
+		edges.add(new Edge<>(5L, 3L, "D"));
+
+		return env.fromCollection(edges);
+	}
+
+	/**
+	 * Creates a set of edges with {@link NullValue} as edge value.
+	 *
+	 * @param env execution environment
+	 * @return edge data set with null values
+	 */
+	@SuppressWarnings("serial")
+	public static DataSet<Edge<Long, NullValue>> getEdgesWithAbsentValues(ExecutionEnvironment
env) {
+		return getEdges(env).map(new MapFunction<Edge<Long, String>, Edge<Long, NullValue>>()
{
+			@Override
+			public Edge<Long, NullValue> map(Edge<Long, String> value) throws Exception
{
+				return new Edge<>(value.getSource(), value.getTarget(), NullValue.getInstance());
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
new file mode 100644
index 0000000..0dcdc1f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+/**
+ * The summarization algorithm computes a condensed version of the input graph<br>
+ * by grouping vertices and edges based on their values. By doing this, the<br>
+ * algorithm helps to uncover insights about patterns and distributions in the<br>
+ * graph.
+ * <p>
+ * In the resulting graph, each vertex represents a group of vertices that share the<br>
+ * same vertex value. An edge, that connects a vertex with itself, represents all edges<br>
+ * with the same edge value that connect vertices inside that group. An edge between<br>
+ * vertices in the output graph represents all edges with the same edge value between<br>
+ * members of those groups in the input graph.
+ * <p>
+ * Consider the following example:
+ * <p>
+ * Input graph:
+ * <p>
+ * Vertices (id, value):<br>
+ * (0, "A")<br>
+ * (1, "A")<br>
+ * (2, "B")<br>
+ * (3, "B")<br>
+ * <p>
+ * Edges (source, target, value):
+ * (0,1, null)<br>
+ * (1,0, null)<br>
+ * (1,2, null)<br>
+ * (2,1, null)<br>
+ * (2,3, null)<br>
+ * (3,2, null)<br>
+ * <p>
+ * Output graph:
+ * <p>
+ * Vertices (id, (value, count)):<br>
+ * (0, ("A", 2)) // 0 and 1 <br>
+ * (2, ("B", 2)) // 2 and 3 <br>
+ * <p>
+ * Edges (source, target, (value, count)):<br>
+ * (0, 0, (null, 2)) // (0,1) and (1,0) <br>
+ * (2, 2, (null, 2)) // (2,3) and (3,2) <br>
+ * (0, 2, (null, 1)) // (1,2) <br>
+ * (2, 0, (null, 1)) // (2,1) <br>
+ *
+ * Note that this implementation is non-deterministic in the way that it assigns<br>
+ * identifiers to summarized vertices. However, it is guaranteed that the identifier<br>
+ * is one of the represented vertex identifiers.
+ *
+ * @param <K> 	vertex identifier type
+ * @param <VV> 	vertex value type
+ * @param <EV> 	edge value type
+ */
+public class Summarization<K, VV, EV>
+		implements GraphAlgorithm<K, VV, EV,
+		Graph<K, Summarization.VertexValue<VV>, Summarization.EdgeValue<EV>>>
{
+
+	@Override
+	public Graph<K, VertexValue<VV>, EdgeValue<EV>> run(Graph<K, VV, EV>
input) throws Exception {
+		// -------------------------
+		// build summarized vertices
+		// -------------------------
+
+		// group vertices by value
+		UnsortedGrouping<Vertex<K, VV>> vertexUnsortedGrouping = input.getVertices()
+				.groupBy(1);
+		// reduce vertex group and create vertex group items
+		GroupReduceOperator<Vertex<K, VV>, VertexGroupItem<K, VV>> vertexGroupItems
= vertexUnsortedGrouping
+				.reduceGroup(new VertexGroupReducer<K, VV>());
+		// create summarized vertices
+		DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = vertexGroupItems
+				.filter(new VertexGroupItemToSummarizedVertexFilter<K, VV>())
+				.map(new VertexGroupItemToSummarizedVertexMapper<K, VV>());
+		// create mapping between vertices and their representative
+		DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap = vertexGroupItems
+				.filter(new VertexGroupItemToRepresentativeFilter<K, VV>())
+				.map(new VertexGroupItemToVertexWithRepresentativeMapper<K, VV>());
+
+		// -------------------------
+		// build summarized edges
+		// -------------------------
+
+		// join edges with vertex representatives and update source and target identifiers
+		DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
+				.join(vertexToRepresentativeMap)
+				.where(0) 	// source vertex id
+				.equalTo(0) // vertex id
+				.with(new SourceVertexJoinFunction<K, EV>())
+				.join(vertexToRepresentativeMap)
+				.where(1) 	// target vertex id
+				.equalTo(0) // vertex id
+				.with(new TargetVertexJoinFunction<K, EV>());
+		// create summarized edges
+		DataSet<Edge<K, EdgeValue<EV>>> summarizedEdges = edgesForGrouping
+				.groupBy(0, 1, 2) // group by source id (0), target id (1) and edge value (2)
+				.reduceGroup(new EdgeGroupReducer<K, EV>());
+
+		return Graph.fromDataSet(summarizedVertices, summarizedEdges, input.getContext());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Tuple Types
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Value that is stored at a summarized vertex.
+	 *
+	 * f0: vertex group value
+	 * f1: vertex group count
+	 *
+	 * @param <VV> vertex value type
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexValue<VV> extends Tuple2<VV, Long> {
+
+		public VV getVertexGroupValue() {
+			return f0;
+		}
+
+		public void setVertexGroupValue(VV vertexGroupValue) {
+			f0 = vertexGroupValue;
+		}
+
+		public Long getVertexGroupCount() {
+			return f1;
+		}
+
+		public void setVertexGroupCount(Long vertexGroupCount) {
+			f1 = vertexGroupCount;
+		}
+	}
+
+	/**
+	 * Value that is stored at a summarized edge.
+	 *
+	 * f0: edge group value
+	 * f1: edge group count
+	 *
+	 * @param <EV> edge value type
+	 */
+	@SuppressWarnings("serial")
+	public static final class EdgeValue<EV> extends Tuple2<EV, Long> {
+
+		public EV getEdgeGroupValue() {
+			return f0;
+		}
+
+		public void setEdgeGroupValue(EV edgeGroupValue) {
+			f0 = edgeGroupValue;
+		}
+
+		public Long getEdgeGroupCount() {
+			return f1;
+		}
+
+		public void setEdgeGroupCount(Long edgeGroupCount) {
+			f1 = edgeGroupCount;
+		}
+	}
+
+	/**
+	 * Represents a single vertex in a vertex group.
+	 *
+	 * f0: vertex identifier
+	 * f1: vertex group representative identifier
+	 * f2: vertex group value
+	 * f3: vertex group count
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <VGV> vertex group value type
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexGroupItem<K, VGV> extends Tuple4<K, K, VGV, Long>
{
+
+		public VertexGroupItem() {
+			setVertexGroupCount(0L);
+		}
+
+		public K getVertexId() {
+			return f0;
+		}
+
+		public void setVertexId(K vertexId) {
+			f0 = vertexId;
+		}
+
+		public K getGroupRepresentativeId() {
+			return f1;
+		}
+
+		public void setGroupRepresentativeId(K groupRepresentativeId) {
+			f1 = groupRepresentativeId;
+		}
+
+		public VGV getVertexGroupValue() {
+			return f2;
+		}
+
+		public void setVertexGroupValue(VGV vertexGroupValue) {
+			f2 = vertexGroupValue;
+		}
+
+		public Long getVertexGroupCount() {
+			return f3;
+		}
+
+		public void setVertexGroupCount(Long vertexGroupCount) {
+			f3 = vertexGroupCount;
+		}
+
+		/**
+		 * Resets the fields to initial values. This is necessary if the tuples are reused and
not all fields were modified.
+		 */
+		public void reset() {
+			f0 = null;
+			f1 = null;
+			f2 = null;
+			f3 = 0L;
+		}
+	}
+
+	/**
+	 * Represents a vertex identifier and its corresponding vertex group identifier.
+	 *
+	 * @param <K> vertex identifier type
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexWithRepresentative<K> extends Tuple2<K, K> {
+
+		public void setVertexId(K vertexId) {
+			f0 = vertexId;
+		}
+
+		public K getGroupRepresentativeId() {
+			return f1;
+		}
+
+		public void setGroupRepresentativeId(K groupRepresentativeId) {
+			f1 = groupRepresentativeId;
+		}
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+	//  Functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates one {@link VertexGroupItem} for each group element containing the vertex identifier
and the identifier
+	 * of the group representative which is the first vertex in the reduce input iterable.
+	 *
+	 * Creates one {@link VertexGroupItem} representing the whole group that contains the vertex
identifier of the
+	 * group representative, the vertex group value and the total number of group elements.
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <VV> 	vertex value type
+	 */
+	@SuppressWarnings("serial")
+	private static final class VertexGroupReducer<K, VV>
+			implements GroupReduceFunction<Vertex<K, VV>, VertexGroupItem<K, VV>>
{
+
+		private final VertexGroupItem<K, VV> reuseVertexGroupItem;
+
+		private VertexGroupReducer() {
+			this.reuseVertexGroupItem = new VertexGroupItem<>();
+		}
+
+		@Override
+		public void reduce(Iterable<Vertex<K, VV>> values, Collector<VertexGroupItem<K,
VV>> out) throws Exception {
+			K vertexGroupRepresentativeID = null;
+			long vertexGroupCount = 0L;
+			VV vertexGroupValue = null;
+			boolean isFirstElement = true;
+
+			for (Vertex<K, VV> vertex : values) {
+				if (isFirstElement) {
+					// take final group representative vertex id from first tuple
+					vertexGroupRepresentativeID = vertex.getId();
+					vertexGroupValue = vertex.getValue();
+					isFirstElement = false;
+				}
+				// no need to set group value for those tuples
+				reuseVertexGroupItem.setVertexId(vertex.getId());
+				reuseVertexGroupItem.setGroupRepresentativeId(vertexGroupRepresentativeID);
+				out.collect(reuseVertexGroupItem);
+				vertexGroupCount++;
+			}
+
+			createGroupRepresentativeTuple(vertexGroupRepresentativeID, vertexGroupValue, vertexGroupCount);
+			out.collect(reuseVertexGroupItem);
+			reuseVertexGroupItem.reset();
+		}
+
+		/**
+		 * Creates one tuple representing the whole group. This tuple is later used to create a
summarized vertex for each
+		 * group.
+		 *
+		 * @param vertexGroupRepresentativeId group representative vertex identifier
+		 * @param vertexGroupValue  					group property value
+		 * @param vertexGroupCount          	total group count
+		 */
+		private void createGroupRepresentativeTuple(K vertexGroupRepresentativeId,
+																								VV vertexGroupValue,
+																								Long vertexGroupCount) {
+			reuseVertexGroupItem.setVertexId(vertexGroupRepresentativeId);
+			reuseVertexGroupItem.setVertexGroupValue(vertexGroupValue);
+			reuseVertexGroupItem.setVertexGroupCount(vertexGroupCount);
+		}
+	}
+
+	/**
+	 * Creates a summarized edge from a group of edges. Counts the number of elements in the
group.
+	 *
+	 * @param <K> vertex identifier type
+	 * @param <EV> edge group value type
+	 */
+	@SuppressWarnings("serial")
+	private static final class EdgeGroupReducer<K, EV>
+			implements GroupReduceFunction<Edge<K, EV>, Edge<K, EdgeValue<EV>>>
{
+
+		private final Edge<K, EdgeValue<EV>> reuseEdge;
+
+		private final EdgeValue<EV> reuseEdgeValue;
+
+		private EdgeGroupReducer() {
+			reuseEdge = new Edge<>();
+			reuseEdgeValue = new EdgeValue<>();
+		}
+
+		@Override
+		public void reduce(Iterable<Edge<K, EV>> values, Collector<Edge<K, EdgeValue<EV>>>
out) throws Exception {
+			K sourceVertexId = null;
+			K targetVertexId = null;
+			EV edgeGroupValue = null;
+			Long edgeGroupCount = 0L;
+			boolean isFirstElement = true;
+
+			for (Edge<K, EV> edge : values) {
+				if (isFirstElement) {
+					sourceVertexId = edge.getSource();
+					targetVertexId = edge.getTarget();
+					edgeGroupValue = edge.getValue();
+					isFirstElement = false;
+				}
+				edgeGroupCount++;
+			}
+			reuseEdgeValue.setEdgeGroupValue(edgeGroupValue);
+			reuseEdgeValue.setEdgeGroupCount(edgeGroupCount);
+			reuseEdge.setSource(sourceVertexId);
+			reuseEdge.setTarget(targetVertexId);
+			reuseEdge.setValue(reuseEdgeValue);
+			out.collect(reuseEdge);
+		}
+	}
+
+	/**
+	 * Filter tuples that are representing a vertex group. They are used to create new summarized
vertices and have a
+	 * group count greater than zero.
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <VV> 	vertex value type
+	 */
+	@SuppressWarnings("serial")
+	@FunctionAnnotation.ForwardedFields("*->*")
+	private static final class VertexGroupItemToSummarizedVertexFilter<K, VV>
+			implements FilterFunction<VertexGroupItem<K, VV>> {
+
+		@Override
+		public boolean filter(VertexGroupItem<K, VV> vertexGroupItem) throws Exception {
+			return !vertexGroupItem.getVertexGroupCount().equals(0L);
+		}
+	}
+
+	/**
+	 * Filter tuples that are representing a single vertex. They are used to update the source
and target vertex
+	 * identifiers at the edges.
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <VV> 	vertex value type
+	 */
+	@SuppressWarnings("serial")
+	@FunctionAnnotation.ForwardedFields("*->*")
+	private static final class VertexGroupItemToRepresentativeFilter<K, VV>
+			implements FilterFunction<VertexGroupItem<K, VV>> {
+
+		@Override
+		public boolean filter(VertexGroupItem<K, VV> vertexGroupItem) throws Exception {
+			return vertexGroupItem.getVertexGroupCount().equals(0L);
+		}
+	}
+
+	/**
+	 * Creates a new vertex representing a vertex group. The vertex stores the group value and
the number of vertices in
+	 * the group.
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <VV> 	vertex value type
+	 */
+	@SuppressWarnings("serial")
+	private static final class VertexGroupItemToSummarizedVertexMapper<K, VV>
+			implements MapFunction<VertexGroupItem<K, VV>, Vertex<K, VertexValue<VV>>>
{
+
+		private final VertexValue<VV> reuseSummarizedVertexValue;
+
+		private VertexGroupItemToSummarizedVertexMapper() {
+			reuseSummarizedVertexValue = new VertexValue<>();
+		}
+
+		@Override
+		public Vertex<K, VertexValue<VV>> map(VertexGroupItem<K, VV> value) throws
Exception {
+			K vertexId = value.getVertexId();
+			reuseSummarizedVertexValue.setVertexGroupValue(value.getVertexGroupValue());
+			reuseSummarizedVertexValue.setVertexGroupCount(value.getVertexGroupCount());
+			return new Vertex<>(vertexId, reuseSummarizedVertexValue);
+		}
+	}
+
+	/**
+	 * Creates a {@link VertexWithRepresentative} from a {@link VertexGroupItem}.
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <VV> 	vertex value type
+	 */
+	@SuppressWarnings("serial")
+	@FunctionAnnotation.ForwardedFields("f0;f1")
+	private static final class VertexGroupItemToVertexWithRepresentativeMapper<K, VV>
+			implements MapFunction<VertexGroupItem<K, VV>, VertexWithRepresentative<K>>
{
+
+		private final VertexWithRepresentative<K> reuseVertexWithRepresentative;
+
+		private VertexGroupItemToVertexWithRepresentativeMapper() {
+			reuseVertexWithRepresentative = new VertexWithRepresentative<>();
+		}
+
+		@Override
+		public VertexWithRepresentative<K> map(VertexGroupItem<K, VV> vertexGroupItem)
throws Exception {
+			reuseVertexWithRepresentative.setVertexId(vertexGroupItem.getVertexId());
+			reuseVertexWithRepresentative.setGroupRepresentativeId(vertexGroupItem.getGroupRepresentativeId());
+			return reuseVertexWithRepresentative;
+		}
+	}
+
+	/**
+	 * Replaces the source vertex id with the vertex group representative id and adds the edge
group value.
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <EV> 	edge value type
+	 */
+	@SuppressWarnings("serial")
+	@FunctionAnnotation.ForwardedFieldsFirst("f1") 			// edge target id
+	@FunctionAnnotation.ForwardedFieldsSecond("f1->f0") // vertex group id -> edge source
id
+	private static final class SourceVertexJoinFunction<K, EV>
+			implements JoinFunction<Edge<K, EV>, VertexWithRepresentative<K>, Edge<K,
EV>> {
+
+		private final Edge<K, EV> reuseEdge;
+
+		private SourceVertexJoinFunction() {
+			this.reuseEdge = new Edge<>();
+		}
+
+		@Override
+		public Edge<K, EV> join(Edge<K, EV> edge, VertexWithRepresentative<K>
vertex) throws Exception {
+			reuseEdge.setSource(vertex.getGroupRepresentativeId());
+			reuseEdge.setTarget(edge.getTarget());
+			reuseEdge.setValue(edge.getValue());
+			return reuseEdge;
+		}
+	}
+
+	/**
+	 * Replaces the target vertex id with the vertex group identifier.
+	 *
+	 * @param <K> 	vertex identifier type
+	 * @param <EV> 	edge group value type
+	 */
+	@SuppressWarnings("serial")
+	@FunctionAnnotation.ForwardedFieldsFirst("f0;f2") // source vertex id, edge group value
+	@FunctionAnnotation.ForwardedFieldsSecond("f1") 	// vertex group id -> edge target id
+	private static final class TargetVertexJoinFunction<K, EV>
+			implements JoinFunction<Edge<K, EV>, VertexWithRepresentative<K>, Edge<K,
EV>> {
+		@Override
+		public Edge<K, EV> join(Edge<K, EV> edge,
+														VertexWithRepresentative<K> vertexRepresentative) throws Exception {
+			edge.setTarget(vertexRepresentative.getGroupRepresentativeId());
+			return edge;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/daa357ac/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
new file mode 100644
index 0000000..abb4511
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SummarizationData;
+import org.apache.flink.graph.library.Summarization;
+import org.apache.flink.graph.library.Summarization.EdgeValue;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class SummarizationITCase extends MultipleProgramsTestBase {
+
+	private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";");
+
+	private static final Pattern ID_SEPARATOR = Pattern.compile(",");
+
+	public SummarizationITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testWithVertexAndEdgeValues() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, String, String> input = Graph.fromDataSet(
+				SummarizationData.getVertices(env),
+				SummarizationData.getEdges(env),
+				env
+		);
+
+		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices
= Lists.newArrayList();
+		List<Edge<Long, EdgeValue<String>>> summarizedEdges = Lists.newArrayList();
+
+		Graph<Long, Summarization.VertexValue<String>, EdgeValue<String>> output
=
+				input.run(new Summarization<Long, String, String>());
+
+		output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices));
+		output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges));
+
+		env.execute();
+
+		validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices);
+		validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, summarizedEdges);
+	}
+
+	@Test
+	public void testWithVertexAndAbsentEdgeValues() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, String, NullValue> input = Graph.fromDataSet(
+				SummarizationData.getVertices(env),
+				SummarizationData.getEdgesWithAbsentValues(env),
+				env
+		);
+
+		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices
= Lists.newArrayList();
+		List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = Lists.newArrayList();
+
+		Graph<Long, Summarization.VertexValue<String>, EdgeValue<NullValue>>
output =
+				input.run(new Summarization<Long, String, NullValue>());
+
+		output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices));
+		output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges));
+
+		env.execute();
+
+		validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices);
+		validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, summarizedEdges);
+	}
+
+	private void validateVertices(String[] expectedVertices,
+																List<Vertex<Long, Summarization.VertexValue<String>>> actualVertices)
{
+		Arrays.sort(expectedVertices);
+		Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<String>>>()
{
+			@Override
+			public int compare(Vertex<Long, Summarization.VertexValue<String>> o1,
+												 Vertex<Long, Summarization.VertexValue<String>> o2) {
+				int result = o1.getId().compareTo(o2.getId());
+				if (result == 0) {
+					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+				}
+				if (result == 0) {
+					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+				}
+				if (result == 0) {
+					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+				}
+				return result;
+			}
+		});
+
+		for (int i = 0; i < expectedVertices.length; i++) {
+			validateVertex(expectedVertices[i], actualVertices.get(i));
+		}
+	}
+
+	private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges,
+														 List<Edge<Long, EdgeValue<EV>>> actualEdges) {
+		Arrays.sort(expectedEdges);
+		Collections.sort(actualEdges, new Comparator<Edge<Long, EdgeValue<EV>>>
() {
+
+			@Override
+			public int compare(Edge<Long, EdgeValue<EV>> o1, Edge<Long, EdgeValue<EV>>
o2) {
+				int result = o1.getSource().compareTo(o2.getSource());
+				if (result == 0) {
+					result = o1.getTarget().compareTo(o2.getTarget());
+				}
+				if (result == 0) {
+					result = o1.getTarget().compareTo(o2.getTarget());
+				}
+				if (result == 0) {
+					result = o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue());
+				}
+				if (result == 0) {
+					result = o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount());
+				}
+				return result;
+			}
+		});
+
+		for (int i = 0; i < expectedEdges.length; i++) {
+			validateEdge(expectedEdges[i], actualEdges.get(i));
+		}
+	}
+
+	private void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<String>>
actual) {
+		String[] tokens = TOKEN_SEPARATOR.split(expected);
+		assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId()));
+		assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue());
+		assertEquals(getGroupCount(tokens[1]), actual.getValue().getVertexGroupCount());
+	}
+
+	private <EV> void validateEdge(String expected, Edge<Long, EdgeValue<EV>>
actual) {
+		String[] tokens = TOKEN_SEPARATOR.split(expected);
+		assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource()));
+		assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget()));
+		assertEquals(getGroupValue(tokens[2]), actual.getValue().getEdgeGroupValue().toString());
+		assertEquals(getGroupCount(tokens[2]), actual.getValue().getEdgeGroupCount());
+	}
+
+	private List<Long> getListFromIdRange(String idRange) {
+		List<Long> result = Lists.newArrayList();
+		for (String id : ID_SEPARATOR.split(idRange)) {
+			result.add(Long.parseLong(id));
+		}
+		return result;
+	}
+
+	private String getGroupValue(String token) {
+		return ID_SEPARATOR.split(token)[0];
+	}
+
+	private Long getGroupCount(String token) {
+		return Long.valueOf(ID_SEPARATOR.split(token)[1]);
+	}
+}


Mime
View raw message