flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [2/3] flink git commit: [FLINK-3925] [gelly] GraphAlgorithm to filter by maximum degree
Date Tue, 07 Jun 2016 14:58:12 GMT
[FLINK-3925] [gelly] GraphAlgorithm to filter by maximum degree

This closes #2005


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a611271b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a611271b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a611271b

Branch: refs/heads/master
Commit: a611271b3ef7a084ec8e7edc4d4dc241550d7ad8
Parents: 2145497
Author: Greg Hogan <code@greghogan.com>
Authored: Wed May 18 11:28:24 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Tue Jun 7 09:03:21 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  19 ++
 .../degree/filter/undirected/MaximumDegree.java | 231 +++++++++++++++++++
 .../filter/undirected/MaximumDegreeTest.java    |  71 ++++++
 3 files changed, 321 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 05fbcb5..1f7f271 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2296,6 +2296,25 @@ DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>>
pairDegree = graph
     </tr>
 
     <tr>
+      <td>degree.filter.undirected.<br/><strong>MaximumDegree</strong></td>
+      <td>
+        <p>Filter an <a href="#graph-representation">undirected graph</a>
by maximum degree.</p>
+{% highlight java %}
+Graph<K, VV, EV> filteredGraph = graph
+  .run(new MaximumDegree(5000)
+    .setBroadcastHighDegreeVertices(true)
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setBroadcastHighDegreeVertices</strong>:
join high-degree vertices using a broadcast-hash to reduce data shuffling when removing a
relatively small number of high-degree vertices.</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator
parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree
can be counted from either the edge source or target IDs. By default the source IDs are counted.
Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target
ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
       <td>translate.<br/><strong>TranslateGraphIds</strong></td>
       <td>
         <p>Translate vertex and edge IDs using the given <code>TranslateFunction</code>.</p>

http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
new file mode 100644
index 0000000..e7d78bb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.filter.undirected;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Removes vertices from a graph with degree greater than the given maximum.
+ * Any edge with with a source or target vertex with degree greater than the
+ * given maximum is also removed.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class MaximumDegree<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+
+	// Required configuration
+	private long maximumDegree;
+
+	// Optional configuration
+	private boolean reduceOnTargetId = false;
+
+	private boolean broadcastHighDegreeVertices = false;
+
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Filter out vertices with degree greater than the given maximum.
+	 *
+	 * @param maximumDegree maximum degree
+	 */
+	public MaximumDegree(long maximumDegree) {
+		Preconditions.checkArgument(maximumDegree > 0, "Maximum degree must be greater than
zero");
+
+		this.maximumDegree = maximumDegree;
+	}
+
+	/**
+	 * The degree can be counted from either the edge source or target IDs.
+	 * By default the source IDs are counted. Reducing on target IDs may
+	 * optimize the algorithm if the input edge list is sorted by target ID.
+	 *
+	 * @param reduceOnTargetId set to {@code true} if the input edge list
+	 *                         is sorted by target ID
+	 * @return this
+	 */
+	public MaximumDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
+		this.reduceOnTargetId = reduceOnTargetId;
+
+		return this;
+	}
+
+	/**
+	 * After filtering high-degree vertices this algorithm must perform joins
+	 * on the original graph's vertex set and on both the source and target IDs
+	 * of the edge set. These joins can be performed without shuffling data
+	 * over the network if the high-degree vertices are distributed by a
+	 * broadcast-hash.
+	 *
+	 * @param broadcastHighDegreeVertices set to {@code true} if the high-degree
+	 *                                    vertices should be broadcast when joining
+	 * @return this
+	 */
+	public MaximumDegree<K, VV, EV> setBroadcastHighDegreeVertices(boolean broadcastHighDegreeVertices)
{
+		this.broadcastHighDegreeVertices = broadcastHighDegreeVertices;
+
+		return this;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public MaximumDegree<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	/*
+	 * Implementation notes:
+	 *
+	 * The three leftOuterJoin below could be implemented more efficiently
+	 *   as an anti-join when available in Flink.
+	 */
+
+	@Override
+	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// u, d(u)
+		DataSet<Vertex<K, LongValue>> vertexDegree = input
+			.run(new VertexDegree<K, VV, EV>()
+				.setReduceOnTargetId(reduceOnTargetId)
+				.setParallelism(parallelism));
+
+		// u, d(u) if d(u) > maximumDegree
+		DataSet<Tuple1<K>> highDegreeVertices = vertexDegree
+			.flatMap(new DegreeFilter<K>(maximumDegree))
+				.setParallelism(parallelism)
+				.name("Filter high-degree vertices");
+
+		JoinHint joinHint = broadcastHighDegreeVertices ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND;
+
+		// Vertices
+		DataSet<Vertex<K, VV>> vertices = input
+			.getVertices()
+			.leftOuterJoin(highDegreeVertices, joinHint)
+			.where(0)
+			.equalTo(0)
+			.with(new ProjectVertex<K, VV>())
+				.setParallelism(parallelism)
+				.name("Project low-degree vertices");
+
+		// Edges
+		DataSet<Edge<K, EV>> edges = input
+			.getEdges()
+			.leftOuterJoin(highDegreeVertices, joinHint)
+			.where(reduceOnTargetId ? 1 : 0)
+			.equalTo(0)
+				.with(new ProjectEdge<K, EV>())
+				.setParallelism(parallelism)
+				.name("Project low-degree edges by " + (reduceOnTargetId ? "target" : "source"))
+			.leftOuterJoin(highDegreeVertices, joinHint)
+			.where(reduceOnTargetId ? 0 : 1)
+			.equalTo(0)
+			.with(new ProjectEdge<K, EV>())
+				.setParallelism(parallelism)
+				.name("Project low-degree edges by " + (reduceOnTargetId ? "source" : "target"));
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, input.getContext());
+	}
+
+	/**
+	 * Emit vertices with degree greater than the given maximum.
+	 *
+	 * @param <K> ID type
+	 */
+	@ForwardedFields("0")
+	private static class DegreeFilter<K>
+	implements FlatMapFunction<Vertex<K, LongValue>, Tuple1<K>> {
+		private long maximumDegree;
+
+		private Tuple1<K> output = new Tuple1<>();
+
+		public DegreeFilter(long maximumDegree) {
+			this.maximumDegree = maximumDegree;
+		}
+
+		@Override
+		public void flatMap(Vertex<K, LongValue> value, Collector<Tuple1<K>>
out)
+				throws Exception {
+			if (value.f1.getValue() > maximumDegree) {
+				output.f0 = value.f0;
+				out.collect(output);
+			}
+		}
+	}
+
+	/**
+	 * Project vertex.
+	 *
+	 * @param <T> ID type
+	 * @param <VT> vertex value type
+	 */
+	@ForwardedFieldsFirst("0; 1")
+	private static class ProjectVertex<T, VT>
+	implements FlatJoinFunction<Vertex<T, VT>, Tuple1<T>, Vertex<T, VT>>
{
+		@Override
+		public void join(Vertex<T, VT> vertex, Tuple1<T> id, Collector<Vertex<T,
VT>> out)
+				throws Exception {
+			if (id == null) {
+				out.collect(vertex);
+			}
+		}
+	}
+
+	/**
+	 * Project edge.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	@ForwardedFieldsFirst("0; 1; 2")
+	private static class ProjectEdge<T, ET>
+	implements FlatJoinFunction<Edge<T, ET>, Tuple1<T>, Edge<T, ET>>
{
+		@Override
+		public void join(Edge<T, ET> edge, Tuple1<T> id, Collector<Edge<T, ET>>
out)
+				throws Exception {
+			if (id == null) {
+				out.collect(edge);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
new file mode 100644
index 0000000..b3a3356
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.filter.undirected;
+
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MaximumDegreeTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		Graph<IntValue, NullValue, NullValue> graph = undirectedSimpleGraph
+			.run(new MaximumDegree<IntValue, NullValue, NullValue>(3));
+
+		String expectedVerticesResult =
+			"(0,(null))\n" +
+			"(1,(null))\n" +
+			"(2,(null))\n" +
+			"(4,(null))\n" +
+			"(5,(null))";
+
+		TestBaseUtils.compareResultAsText(graph.getVertices().collect(), expectedVerticesResult);
+
+		String expectedEdgesResult =
+			"(0,1,(null))\n" +
+			"(0,2,(null))\n" +
+			"(1,0,(null))\n" +
+			"(1,2,(null))\n" +
+			"(2,0,(null))\n" +
+			"(2,1,(null))";
+
+		TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expectedEdgesResult);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		ChecksumHashCode checksum = GraphUtils.checksumHashCode(undirectedRMatGraph
+			.run(new MaximumDegree<LongValue, NullValue, NullValue>(16)));
+
+		assertEquals(805, checksum.getCount());
+		assertEquals(0x0000000008028b43L, checksum.getChecksum());
+	}
+}


Mime
View raw message