Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8C2A7200AC8 for ; Tue, 7 Jun 2016 16:58:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8AEB0160A57; Tue, 7 Jun 2016 14:58:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 86830160A4F for ; Tue, 7 Jun 2016 16:58:12 +0200 (CEST) Received: (qmail 56589 invoked by uid 500); 7 Jun 2016 14:58:11 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 56523 invoked by uid 99); 7 Jun 2016 14:58:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Jun 2016 14:58:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 58D01E0593; Tue, 7 Jun 2016 14:58:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greg@apache.org To: commits@flink.apache.org Date: Tue, 07 Jun 2016 14:58:12 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] flink git commit: [FLINK-3925] [gelly] GraphAlgorithm to filter by maximum degree archived-at: Tue, 07 Jun 2016 14:58:13 -0000 [FLINK-3925] [gelly] GraphAlgorithm to filter by maximum degree This closes #2005 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a611271b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a611271b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a611271b Branch: refs/heads/master Commit: a611271b3ef7a084ec8e7edc4d4dc241550d7ad8 Parents: 2145497 Author: Greg Hogan Authored: Wed May 18 11:28:24 2016 -0400 Committer: Greg Hogan Committed: Tue Jun 7 09:03:21 2016 -0400 ---------------------------------------------------------------------- docs/apis/batch/libs/gelly.md | 19 ++ .../degree/filter/undirected/MaximumDegree.java | 231 +++++++++++++++++++ .../filter/undirected/MaximumDegreeTest.java | 71 ++++++ 3 files changed, 321 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/docs/apis/batch/libs/gelly.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 05fbcb5..1f7f271 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -2296,6 +2296,25 @@ DataSet>> pairDegree = graph + degree.filter.undirected.
MaximumDegree + +

Filter an undirected graph by maximum degree.

+{% highlight java %} +Graph filteredGraph = graph + .run(new MaximumDegree(5000) + .setBroadcastHighDegreeVertices(true) + .setReduceOnTargetId(true)); +{% endhighlight %} +

Optional configuration:

+
    +
  • setBroadcastHighDegreeVertices: join high-degree vertices using a broadcast-hash to reduce data shuffling when removing a relatively small number of high-degree vertices.

  • +
  • setParallelism: override the operator parallelism

  • +
  • setReduceOnTargetId: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.

  • +
+ + + + translate.
TranslateGraphIds

Translate vertex and edge IDs using the given TranslateFunction.

http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java new file mode 100644 index 0000000..e7d78bb --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.filter.undirected; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Removes vertices from a graph with degree greater than the given maximum. + * Any edge with with a source or target vertex with degree greater than the + * given maximum is also removed. + * + * @param ID type + * @param vertex value type + * @param edge value type + */ +public class MaximumDegree +implements GraphAlgorithm> { + + // Required configuration + private long maximumDegree; + + // Optional configuration + private boolean reduceOnTargetId = false; + + private boolean broadcastHighDegreeVertices = false; + + private int parallelism = PARALLELISM_DEFAULT; + + /** + * Filter out vertices with degree greater than the given maximum. + * + * @param maximumDegree maximum degree + */ + public MaximumDegree(long maximumDegree) { + Preconditions.checkArgument(maximumDegree > 0, "Maximum degree must be greater than zero"); + + this.maximumDegree = maximumDegree; + } + + /** + * The degree can be counted from either the edge source or target IDs. + * By default the source IDs are counted. Reducing on target IDs may + * optimize the algorithm if the input edge list is sorted by target ID. + * + * @param reduceOnTargetId set to {@code true} if the input edge list + * is sorted by target ID + * @return this + */ + public MaximumDegree setReduceOnTargetId(boolean reduceOnTargetId) { + this.reduceOnTargetId = reduceOnTargetId; + + return this; + } + + /** + * After filtering high-degree vertices this algorithm must perform joins + * on the original graph's vertex set and on both the source and target IDs + * of the edge set. These joins can be performed without shuffling data + * over the network if the high-degree vertices are distributed by a + * broadcast-hash. + * + * @param broadcastHighDegreeVertices set to {@code true} if the high-degree + * vertices should be broadcast when joining + * @return this + */ + public MaximumDegree setBroadcastHighDegreeVertices(boolean broadcastHighDegreeVertices) { + this.broadcastHighDegreeVertices = broadcastHighDegreeVertices; + + return this; + } + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public MaximumDegree setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + /* + * Implementation notes: + * + * The three leftOuterJoin below could be implemented more efficiently + * as an anti-join when available in Flink. + */ + + @Override + public Graph run(Graph input) + throws Exception { + // u, d(u) + DataSet> vertexDegree = input + .run(new VertexDegree() + .setReduceOnTargetId(reduceOnTargetId) + .setParallelism(parallelism)); + + // u, d(u) if d(u) > maximumDegree + DataSet> highDegreeVertices = vertexDegree + .flatMap(new DegreeFilter(maximumDegree)) + .setParallelism(parallelism) + .name("Filter high-degree vertices"); + + JoinHint joinHint = broadcastHighDegreeVertices ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND; + + // Vertices + DataSet> vertices = input + .getVertices() + .leftOuterJoin(highDegreeVertices, joinHint) + .where(0) + .equalTo(0) + .with(new ProjectVertex()) + .setParallelism(parallelism) + .name("Project low-degree vertices"); + + // Edges + DataSet> edges = input + .getEdges() + .leftOuterJoin(highDegreeVertices, joinHint) + .where(reduceOnTargetId ? 1 : 0) + .equalTo(0) + .with(new ProjectEdge()) + .setParallelism(parallelism) + .name("Project low-degree edges by " + (reduceOnTargetId ? "target" : "source")) + .leftOuterJoin(highDegreeVertices, joinHint) + .where(reduceOnTargetId ? 0 : 1) + .equalTo(0) + .with(new ProjectEdge()) + .setParallelism(parallelism) + .name("Project low-degree edges by " + (reduceOnTargetId ? "source" : "target")); + + // Graph + return Graph.fromDataSet(vertices, edges, input.getContext()); + } + + /** + * Emit vertices with degree greater than the given maximum. + * + * @param ID type + */ + @ForwardedFields("0") + private static class DegreeFilter + implements FlatMapFunction, Tuple1> { + private long maximumDegree; + + private Tuple1 output = new Tuple1<>(); + + public DegreeFilter(long maximumDegree) { + this.maximumDegree = maximumDegree; + } + + @Override + public void flatMap(Vertex value, Collector> out) + throws Exception { + if (value.f1.getValue() > maximumDegree) { + output.f0 = value.f0; + out.collect(output); + } + } + } + + /** + * Project vertex. + * + * @param ID type + * @param vertex value type + */ + @ForwardedFieldsFirst("0; 1") + private static class ProjectVertex + implements FlatJoinFunction, Tuple1, Vertex> { + @Override + public void join(Vertex vertex, Tuple1 id, Collector> out) + throws Exception { + if (id == null) { + out.collect(vertex); + } + } + } + + /** + * Project edge. + * + * @param ID type + * @param edge value type + */ + @ForwardedFieldsFirst("0; 1; 2") + private static class ProjectEdge + implements FlatJoinFunction, Tuple1, Edge> { + @Override + public void join(Edge edge, Tuple1 id, Collector> out) + throws Exception { + if (id == null) { + out.collect(edge); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java new file mode 100644 index 0000000..b3a3356 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.filter.undirected; + +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.utils.GraphUtils; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class MaximumDegreeTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + Graph graph = undirectedSimpleGraph + .run(new MaximumDegree(3)); + + String expectedVerticesResult = + "(0,(null))\n" + + "(1,(null))\n" + + "(2,(null))\n" + + "(4,(null))\n" + + "(5,(null))"; + + TestBaseUtils.compareResultAsText(graph.getVertices().collect(), expectedVerticesResult); + + String expectedEdgesResult = + "(0,1,(null))\n" + + "(0,2,(null))\n" + + "(1,0,(null))\n" + + "(1,2,(null))\n" + + "(2,0,(null))\n" + + "(2,1,(null))"; + + TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expectedEdgesResult); + } + + @Test + public void testWithRMatGraph() + throws Exception { + ChecksumHashCode checksum = GraphUtils.checksumHashCode(undirectedRMatGraph + .run(new MaximumDegree(16))); + + assertEquals(805, checksum.getCount()); + assertEquals(0x0000000008028b43L, checksum.getChecksum()); + } +}