Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C4B9718EA5 for ; Sat, 22 Aug 2015 19:39:32 +0000 (UTC) Received: (qmail 59818 invoked by uid 500); 22 Aug 2015 19:39:32 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 59723 invoked by uid 500); 22 Aug 2015 19:39:32 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 59690 invoked by uid 99); 22 Aug 2015 19:39:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 19:39:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6E47DDFFED; Sat, 22 Aug 2015 19:39:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Date: Sat, 22 Aug 2015 19:39:32 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] flink git commit: [FLINK-2451] [gelly] library methods cleanup Repository: flink Updated Branches: refs/heads/master 3a8302998 -> 5ae84273c [FLINK-2451] [gelly] library methods cleanup Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2d061c3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2d061c3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2d061c3 Branch: refs/heads/master Commit: d2d061c3004dcdc31a1df6b422a6aba6d9c294f8 Parents: 970ab35 Author: vasia Authored: Mon Aug 3 15:20:44 2015 +0200 Committer: vasia Committed: Sat Aug 22 20:46:20 2015 +0200 ---------------------------------------------------------------------- .../graph/example/ConnectedComponents.java | 4 +- .../flink/graph/example/MusicProfiles.java | 4 +- .../flink/graph/library/CommunityDetection.java | 173 +++++++++++++++++++ .../library/CommunityDetectionAlgorithm.java | 173 ------------------- .../graph/library/ConnectedComponents.java | 88 ++++++++++ .../library/ConnectedComponentsAlgorithm.java | 89 ---------- .../graph/library/GSAConnectedComponents.java | 127 ++------------ .../apache/flink/graph/library/GSAPageRank.java | 161 +++-------------- .../library/GSASingleSourceShortestPaths.java | 126 +++----------- .../flink/graph/library/LabelPropagation.java | 113 ++++++++++++ .../library/LabelPropagationAlgorithm.java | 115 ------------ .../apache/flink/graph/library/PageRank.java | 105 +++++++++++ .../flink/graph/library/PageRankAlgorithm.java | 105 ----------- .../library/SingleSourceShortestPaths.java | 111 ++++++++++++ .../SingleSourceShortestPathsAlgorithm.java | 111 ------------ 15 files changed, 653 insertions(+), 952 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java index b841ced..a4a6708 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java @@ -27,7 +27,7 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData; -import org.apache.flink.graph.library.ConnectedComponentsAlgorithm; +import org.apache.flink.graph.library.GSAConnectedComponents; import org.apache.flink.types.NullValue; /** @@ -69,7 +69,7 @@ public class ConnectedComponents implements ProgramDescription { }, env); DataSet> verticesWithMinIds = graph - .run(new ConnectedComponentsAlgorithm(maxIterations)).getVertices(); + .run(new GSAConnectedComponents(maxIterations)).getVertices(); // emit result if (fileOutput) { http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index 7643976..a56224d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -38,7 +38,7 @@ import org.apache.flink.graph.EdgesFunctionWithVertexValue; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.MusicProfilesData; -import org.apache.flink.graph.library.LabelPropagationAlgorithm; +import org.apache.flink.graph.library.LabelPropagation; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; @@ -153,7 +153,7 @@ public class MusicProfiles implements ProgramDescription { public Long map(Tuple2 value) { return value.f1; } - }).run(new LabelPropagationAlgorithm(maxIterations)) + }).run(new LabelPropagation(maxIterations)) .getVertices(); if (fileOutput) { http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java new file mode 100644 index 0000000..21bef53 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Community Detection Algorithm. + * + * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value. + * The vertices propagate their labels and max scores in iterations, each time adopting the label with the + * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction + * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value. + * + * The algorithm converges when vertices no longer update their value or when the maximum number of iterations + * is reached. + * + * @see article explaining the algorithm in detail + */ +public class CommunityDetection implements GraphAlgorithm { + + private Integer maxIterations; + + private Double delta; + + public CommunityDetection(Integer maxIterations, Double delta) { + + this.maxIterations = maxIterations; + this.delta = delta; + } + + @Override + public Graph run(Graph graph) { + + Graph undirectedGraph = graph.getUndirected(); + + Graph, Double> graphWithScoredVertices = undirectedGraph + .mapVertices(new AddScoreToVertexValuesMapper()); + + return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta), + new LabelMessenger(), maxIterations) + .mapVertices(new RemoveScoreFromVertexValuesMapper()); + } + + @SuppressWarnings("serial") + public static final class VertexLabelUpdater extends VertexUpdateFunction, Tuple2> { + + private Double delta; + + public VertexLabelUpdater(Double delta) { + this.delta = delta; + } + + @Override + public void updateVertex(Vertex> vertex, + MessageIterator> inMessages) throws Exception { + + // we would like these two maps to be ordered + Map receivedLabelsWithScores = new TreeMap(); + Map labelsWithHighestScore = new TreeMap(); + + for (Tuple2 message : inMessages) { + // split the message into received label and score + Long receivedLabel = message.f0; + Double receivedScore = message.f1; + + // if the label was received before + if (receivedLabelsWithScores.containsKey(receivedLabel)) { + Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel); + receivedLabelsWithScores.put(receivedLabel, newScore); + } else { + // first time we see the label + receivedLabelsWithScores.put(receivedLabel, receivedScore); + } + + // store the labels with the highest scores + if (labelsWithHighestScore.containsKey(receivedLabel)) { + Double currentScore = labelsWithHighestScore.get(receivedLabel); + if (currentScore < receivedScore) { + // record the highest score + labelsWithHighestScore.put(receivedLabel, receivedScore); + } + } else { + // first time we see this label + labelsWithHighestScore.put(receivedLabel, receivedScore); + } + } + + if(receivedLabelsWithScores.size() > 0) { + // find the label with the highest score from the ones received + Double maxScore = -Double.MAX_VALUE; + Long maxScoreLabel = vertex.getValue().f0; + for (Long curLabel : receivedLabelsWithScores.keySet()) { + + if (receivedLabelsWithScores.get(curLabel) > maxScore) { + maxScore = receivedLabelsWithScores.get(curLabel); + maxScoreLabel = curLabel; + } + } + + // find the highest score of maxScoreLabel + Double highestScore = labelsWithHighestScore.get(maxScoreLabel); + // re-score the new label + if (maxScoreLabel != vertex.getValue().f0) { + highestScore -= delta / getSuperstepNumber(); + } + // else delta = 0 + // update own label + setNewVertexValue(new Tuple2(maxScoreLabel, highestScore)); + } + } + } + + @SuppressWarnings("serial") + public static final class LabelMessenger extends MessagingFunction, + Tuple2, Double> { + + @Override + public void sendMessages(Vertex> vertex) throws Exception { + + for(Edge edge : getEdges()) { + sendMessageTo(edge.getTarget(), new Tuple2(vertex.getValue().f0, + vertex.getValue().f1 * edge.getValue())); + } + + } + } + + @SuppressWarnings("serial") + public static final class AddScoreToVertexValuesMapper implements MapFunction, Tuple2> { + + @Override + public Tuple2 map(Vertex vertex) throws Exception { + return new Tuple2(vertex.getValue(), 1.0); + } + } + + @SuppressWarnings("serial") + public static final class RemoveScoreFromVertexValuesMapper implements MapFunction>, Long> { + + @Override + public Long map(Vertex> vertex) throws Exception { + return vertex.getValue().f0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java deleted file mode 100644 index 6f72deb..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; - -import java.util.Map; -import java.util.TreeMap; - -/** - * Community Detection Algorithm. - * - * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value. - * The vertices propagate their labels and max scores in iterations, each time adopting the label with the - * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction - * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value. - * - * The algorithm converges when vertices no longer update their value or when the maximum number of iterations - * is reached. - * - * @see article explaining the algorithm in detail - */ -public class CommunityDetectionAlgorithm implements GraphAlgorithm { - - private Integer maxIterations; - - private Double delta; - - public CommunityDetectionAlgorithm(Integer maxIterations, Double delta) { - - this.maxIterations = maxIterations; - this.delta = delta; - } - - @Override - public Graph run(Graph graph) { - - Graph undirectedGraph = graph.getUndirected(); - - Graph, Double> graphWithScoredVertices = undirectedGraph - .mapVertices(new AddScoreToVertexValuesMapper()); - - return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta), - new LabelMessenger(), maxIterations) - .mapVertices(new RemoveScoreFromVertexValuesMapper()); - } - - @SuppressWarnings("serial") - public static final class VertexLabelUpdater extends VertexUpdateFunction, Tuple2> { - - private Double delta; - - public VertexLabelUpdater(Double delta) { - this.delta = delta; - } - - @Override - public void updateVertex(Vertex> vertex, - MessageIterator> inMessages) throws Exception { - - // we would like these two maps to be ordered - Map receivedLabelsWithScores = new TreeMap(); - Map labelsWithHighestScore = new TreeMap(); - - for (Tuple2 message : inMessages) { - // split the message into received label and score - Long receivedLabel = message.f0; - Double receivedScore = message.f1; - - // if the label was received before - if (receivedLabelsWithScores.containsKey(receivedLabel)) { - Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel); - receivedLabelsWithScores.put(receivedLabel, newScore); - } else { - // first time we see the label - receivedLabelsWithScores.put(receivedLabel, receivedScore); - } - - // store the labels with the highest scores - if (labelsWithHighestScore.containsKey(receivedLabel)) { - Double currentScore = labelsWithHighestScore.get(receivedLabel); - if (currentScore < receivedScore) { - // record the highest score - labelsWithHighestScore.put(receivedLabel, receivedScore); - } - } else { - // first time we see this label - labelsWithHighestScore.put(receivedLabel, receivedScore); - } - } - - if(receivedLabelsWithScores.size() > 0) { - // find the label with the highest score from the ones received - Double maxScore = -Double.MAX_VALUE; - Long maxScoreLabel = vertex.getValue().f0; - for (Long curLabel : receivedLabelsWithScores.keySet()) { - - if (receivedLabelsWithScores.get(curLabel) > maxScore) { - maxScore = receivedLabelsWithScores.get(curLabel); - maxScoreLabel = curLabel; - } - } - - // find the highest score of maxScoreLabel - Double highestScore = labelsWithHighestScore.get(maxScoreLabel); - // re-score the new label - if (maxScoreLabel != vertex.getValue().f0) { - highestScore -= delta / getSuperstepNumber(); - } - // else delta = 0 - // update own label - setNewVertexValue(new Tuple2(maxScoreLabel, highestScore)); - } - } - } - - @SuppressWarnings("serial") - public static final class LabelMessenger extends MessagingFunction, - Tuple2, Double> { - - @Override - public void sendMessages(Vertex> vertex) throws Exception { - - for(Edge edge : getEdges()) { - sendMessageTo(edge.getTarget(), new Tuple2(vertex.getValue().f0, - vertex.getValue().f1 * edge.getValue())); - } - - } - } - - @SuppressWarnings("serial") - public static final class AddScoreToVertexValuesMapper implements MapFunction, Tuple2> { - - @Override - public Tuple2 map(Vertex vertex) throws Exception { - return new Tuple2(vertex.getValue(), 1.0); - } - } - - @SuppressWarnings("serial") - public static final class RemoveScoreFromVertexValuesMapper implements MapFunction>, Long> { - - @Override - public Long map(Vertex> vertex) throws Exception { - return vertex.getValue().f0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java new file mode 100644 index 0000000..c2cec18 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.types.NullValue; + +/** + * A vertex-centric implementation of the Connected components algorithm. + * + * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their + * current component ID in iterations, each time adopting a new value from the received neighbor IDs, + * provided that the value is less than the current minimum. + * + * The algorithm converges when vertices no longer update their value or when the maximum number of iterations + * is reached. + */ +@SuppressWarnings("serial") +public class ConnectedComponents implements GraphAlgorithm{ + + private Integer maxIterations; + + public ConnectedComponents(Integer maxIterations) { + this.maxIterations = maxIterations; + } + + @Override + public Graph run(Graph graph) throws Exception { + + Graph undirectedGraph = graph.getUndirected(); + + // initialize vertex values and run the Vertex Centric Iteration + return undirectedGraph.runVertexCentricIteration(new CCUpdater(), new CCMessenger(), maxIterations); + } + + /** + * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages. + */ + public static final class CCUpdater extends VertexUpdateFunction { + + @Override + public void updateVertex(Vertex vertex, MessageIterator messages) throws Exception { + long min = Long.MAX_VALUE; + + for (long msg : messages) { + min = Math.min(min, msg); + } + + // update vertex value, if new minimum + if (min < vertex.getValue()) { + setNewVertexValue(min); + } + } + } + + /** + * Distributes the minimum ID associated with a given vertex among all the target vertices. + */ + public static final class CCMessenger extends MessagingFunction { + + @Override + public void sendMessages(Vertex vertex) throws Exception { + // send current minimum to neighbors + sendMessageToAllNeighbors(vertex.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java deleted file mode 100644 index 7b536e5..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; -import org.apache.flink.types.NullValue; - -/** - * Connected components algorithm. - * - * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their - * current component ID in iterations, each time adopting a new value from the received neighbor IDs, - * provided that the value is less than the current minimum. - * - * The algorithm converges when vertices no longer update their value or when the maximum number of iterations - * is reached. - */ -@SuppressWarnings("serial") -public class ConnectedComponentsAlgorithm implements GraphAlgorithm{ - - private Integer maxIterations; - - public ConnectedComponentsAlgorithm(Integer maxIterations) { - this.maxIterations = maxIterations; - } - - @Override - public Graph run(Graph graph) throws Exception { - - Graph undirectedGraph = graph.getUndirected(); - - // initialize vertex values and run the Vertex Centric Iteration - return undirectedGraph.runVertexCentricIteration(new CCUpdater(), - new CCMessenger(), maxIterations); - } - - /** - * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages. - */ - public static final class CCUpdater extends VertexUpdateFunction { - - @Override - public void updateVertex(Vertex vertex, MessageIterator messages) throws Exception { - long min = Long.MAX_VALUE; - - for (long msg : messages) { - min = Math.min(min, msg); - } - - // update vertex value, if new minimum - if (min < vertex.getValue()) { - setNewVertexValue(min); - } - } - } - - /** - * Distributes the minimum ID associated with a given vertex among all the target vertices. - */ - public static final class CCMessenger extends MessagingFunction { - - @Override - public void sendMessages(Vertex vertex) throws Exception { - // send current minimum to neighbors - sendMessageToAllNeighbors(vertex.getValue()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java index 9b75c92..b77ca07 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java @@ -16,71 +16,35 @@ * limitations under the License. */ -package org.apache.flink.graph.example; +package org.apache.flink.graph.library; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.Neighbor; import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; /** - * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration + * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration. */ -public class GSAConnectedComponents implements ProgramDescription { +public class GSAConnectedComponents implements GraphAlgorithm { - // -------------------------------------------------------------------------------------------- - // Program - // -------------------------------------------------------------------------------------------- - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> edges = getEdgeDataSet(env); - - Graph graph = Graph.fromDataSet(edges, new InitVertices(), env); - - // Execute the GSA iteration - Graph result = - graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(), - new UpdateComponentId(), maxIterations); - - // Extract the vertices as the result - DataSet> connectedComponents = result.getVertices(); - - // emit result - if (fileOutput) { - connectedComponents.writeAsCsv(outputPath, "\n", " "); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("GSA Connected Components"); - } else { - connectedComponents.print(); - } + private Integer maxIterations; + public GSAConnectedComponents(Integer maxIterations) { + this.maxIterations = maxIterations; } - @SuppressWarnings("serial") - private static final class InitVertices implements MapFunction { + @Override + public Graph run(Graph graph) throws Exception { - public Long map(Long vertexId) { - return vertexId; - } + Graph undirectedGraph = graph.getUndirected(); + + // initialize vertex values and run the Vertex Centric Iteration + return undirectedGraph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId(), + maxIterations); } // -------------------------------------------------------------------------------------------- @@ -112,67 +76,4 @@ public class GSAConnectedComponents implements ProgramDescription { } } } - - // -------------------------------------------------------------------------------------------- - // Util methods - // -------------------------------------------------------------------------------------------- - - private static boolean fileOutput = false; - private static String edgeInputPath = null; - private static String outputPath = null; - - private static int maxIterations = 16; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - - if (args.length != 3) { - System.err.println("Usage: GSAConnectedComponents " + - " "); - return false; - } - - edgeInputPath = args[0]; - outputPath = args[1]; - maxIterations = Integer.parseInt(args[2]); - } else { - System.out.println("Executing GSA Connected Components example with built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: GSAConnectedComponents "); - } - return true; - } - - @SuppressWarnings("serial") - private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgeInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction, Edge>() { - - public Edge map(Tuple2 value) throws Exception { - return new Edge(value.f0, value.f1, NullValue.getInstance()); - } - }); - } - - // Generates 3 components of size 2 - return env.generateSequence(0, 2).flatMap(new FlatMapFunction>() { - @Override - public void flatMap(Long value, Collector> out) throws Exception { - out.collect(new Edge(value, value + 3, NullValue.getInstance())); - } - }); - } - - @Override - public String getDescription() { - return "GSA Connected Components"; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java index 45d4555..4299381 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java @@ -16,86 +16,35 @@ * limitations under the License. */ -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; +package org.apache.flink.graph.library; + import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.Neighbor; import org.apache.flink.graph.gsa.SumFunction; -import org.apache.flink.util.Collector; /** - * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration. - * - * The edges input file is expected to contain one edge per line, with long IDs and no - * values, in the following format:"\t". - * - * If no arguments are provided, the example runs with a random graph of 10 vertices - * and random edge weights. + * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration. */ -public class GSAPageRank implements ProgramDescription { - - @SuppressWarnings("serial") - public static void main(String[] args) throws Exception { +public class GSAPageRank implements GraphAlgorithm { - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> links = getLinksDataSet(env); + private double beta; + private int maxIterations; - Graph network = Graph.fromDataSet(links, new MapFunction() { - - @Override - public Double map(Long value) throws Exception { - return 1.0; - } - }, env); - - DataSet> vertexOutDegrees = network.outDegrees(); - - // Assign the transition probabilities as the edge weights - Graph networkWithWeights = network - .joinWithEdgesOnSource(vertexOutDegrees, - new MapFunction, Double>() { - - @Override - public Double map(Tuple2 value) { - return value.f0 / value.f1; - } - }); - - long numberOfVertices = networkWithWeights.numberOfVertices(); - - // Execute the GSA iteration - Graph result = networkWithWeights - .runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(), - new UpdateRanks(numberOfVertices), maxIterations); - - // Extract the vertices as the result - DataSet> pageRanks = result.getVertices(); + public GSAPageRank(double beta, int maxIterations) { + this.beta = beta; + this.maxIterations = maxIterations; + } - // emit result - if (fileOutput) { - pageRanks.writeAsCsv(outputPath, "\n", "\t"); + @Override + public Graph run(Graph network) throws Exception { - // since file sinks are lazy, we trigger the execution explicitly - env.execute("GSA Page Ranks"); - } else { - pageRanks.print(); - } + final long numberOfVertices = network.numberOfVertices(); + return network.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(), + new UpdateRanks(beta, numberOfVertices), maxIterations); } // -------------------------------------------------------------------------------------------- @@ -133,83 +82,19 @@ public class GSAPageRank implements ProgramDescription { } @SuppressWarnings("serial") - private static final class UpdateRanks extends ApplyFunction { + private static final class UpdateRanks extends ApplyFunction { - long numberOfVertices; + private final double beta; + private final long numVertices; - public UpdateRanks(long numberOfVertices) { - this.numberOfVertices = numberOfVertices; + public UpdateRanks(double beta, long numberOfVertices) { + this.beta = beta; + this.numVertices = numberOfVertices; } @Override public void apply(Double rankSum, Double currentValue) { - setResult((1-DAMPENING_FACTOR)/numberOfVertices + DAMPENING_FACTOR * rankSum); - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static final double DAMPENING_FACTOR = 0.85; - private static long numPages = 10; - private static String edgeInputPath = null; - private static String outputPath = null; - private static int maxIterations = 10; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - if(args.length != 3) { - System.err.println("Usage: GSAPageRank "); - return false; - } - - fileOutput = true; - edgeInputPath = args[0]; - outputPath = args[1]; - maxIterations = Integer.parseInt(args[2]); - } else { - System.out.println("Executing GSAPageRank example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: GSAPageRank "); - } - return true; - } - - @SuppressWarnings("serial") - private static DataSet> getLinksDataSet(ExecutionEnvironment env) { - - if (fileOutput) { - return env.readCsvFile(edgeInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction, Edge>() { - public Edge map(Tuple2 input) { - return new Edge(input.f0, input.f1, 1.0); - } - }).withForwardedFields("f0; f1"); + setResult((1-beta)/numVertices + beta * rankSum); } - - return env.generateSequence(1, numPages).flatMap( - new FlatMapFunction>() { - @Override - public void flatMap(Long key, - Collector> out) throws Exception { - int numOutEdges = (int) (Math.random() * (numPages / 2)); - for (int i = 0; i < numOutEdges; i++) { - long target = (long) (Math.random() * numPages) + 1; - out.collect(new Edge(key, target, 1.0)); - } - } - }); - } - - @Override - public String getDescription() { - return "GSA Page Rank"; } } http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java index b01aa23..78c535b 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java @@ -16,78 +16,52 @@ * limitations under the License. */ -package org.apache.flink.graph.example; +package org.apache.flink.graph.library; -import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.Neighbor; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; /** * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration */ -public class GSASingleSourceShortestPaths implements ProgramDescription { +public class GSASingleSourceShortestPaths implements GraphAlgorithm { - // -------------------------------------------------------------------------------------------- - // Program - // -------------------------------------------------------------------------------------------- - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } + private final K srcVertexId; + private final Integer maxIterations; - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> edges = getEdgeDataSet(env); + public GSASingleSourceShortestPaths(K srcVertexId, Integer maxIterations) { + this.srcVertexId = srcVertexId; + this.maxIterations = maxIterations; + } - Graph graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env); + @Override + public Graph run(Graph input) { - // Execute the GSA iteration - Graph result = graph + return input.mapVertices(new InitVerticesMapper(srcVertexId)) .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(), - new UpdateDistance(), maxIterations); - - // Extract the vertices as the result - DataSet> singleSourceShortestPaths = result.getVertices(); - - // emit result - if(fileOutput) { - singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " "); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("GSA Single Source Shortest Paths"); - } else { - singleSourceShortestPaths.print(); - } - + new UpdateDistance(), maxIterations); } @SuppressWarnings("serial") - private static final class InitVertices implements MapFunction{ + public static final class InitVerticesMapper implements MapFunction, Double> { - private long srcId; + private K srcVertexId; - public InitVertices(long srcId) { - this.srcId = srcId; + public InitVerticesMapper(K srcId) { + this.srcVertexId = srcId; } - public Double map(Long id) { - if (id.equals(srcId)) { + public Double map(Vertex value) { + if (value.f0.equals(srcVertexId)) { return 0.0; - } - else { - return Double.POSITIVE_INFINITY; + } else { + return Double.MAX_VALUE; } } } @@ -113,7 +87,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription { }; @SuppressWarnings("serial") - private static final class UpdateDistance extends ApplyFunction { + private static final class UpdateDistance extends ApplyFunction { public void apply(Double newDistance, Double oldDistance) { if (newDistance < oldDistance) { @@ -121,60 +95,4 @@ public class GSASingleSourceShortestPaths implements ProgramDescription { } } } - - // -------------------------------------------------------------------------------------------- - // Util methods - // -------------------------------------------------------------------------------------------- - - private static boolean fileOutput = false; - - private static Long srcVertexId = 1l; - - private static String edgesInputPath = null; - - private static String outputPath = null; - - private static int maxIterations = 5; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - if(args.length != 4) { - System.err.println("Usage: GSASingleSourceShortestPaths " + - " "); - return false; - } - - fileOutput = true; - srcVertexId = Long.parseLong(args[0]); - edgesInputPath = args[1]; - outputPath = args[2]; - maxIterations = Integer.parseInt(args[3]); - } else { - System.out.println("Executing GSASingle Source Shortest Paths example " - + "with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println("Usage: GSASingleSourceShortestPaths " + - " "); - } - return true; - } - - private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap()); - } else { - return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); - } - } - - @Override - public String getDescription() { - return "GSA Single Source Shortest Paths"; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java new file mode 100644 index 0000000..1648922 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.types.NullValue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * An implementation of the label propagation algorithm. The iterative algorithm + * detects communities by propagating labels. In each iteration, a vertex adopts + * the label that is most frequent among its neighbors' labels. Labels are + * represented by Longs and we assume a total ordering among them, in order to + * break ties. The algorithm converges when no vertex changes its value or the + * maximum number of iterations have been reached. Note that different + * initializations might lead to different results. + * + */ +@SuppressWarnings("serial") + +public class LabelPropagation> implements GraphAlgorithm { + + private final int maxIterations; + + public LabelPropagation(int maxIterations) { + this.maxIterations = maxIterations; + } + + @Override + public Graph run(Graph input) { + + // iteratively adopt the most frequent label among the neighbors + // of each vertex + return input.runVertexCentricIteration(new UpdateVertexLabel(), new SendNewLabelToNeighbors(), + maxIterations); + } + + /** + * Function that updates the value of a vertex by adopting the most frequent + * label among its in-neighbors + */ + public static final class UpdateVertexLabel extends VertexUpdateFunction { + + public void updateVertex(Vertex vertex, + MessageIterator inMessages) { + Map labelsWithFrequencies = new HashMap(); + + long maxFrequency = 1; + long mostFrequentLabel = vertex.getValue(); + + // store the labels with their frequencies + for (Long msg : inMessages) { + if (labelsWithFrequencies.containsKey(msg)) { + long currentFreq = labelsWithFrequencies.get(msg); + labelsWithFrequencies.put(msg, currentFreq + 1); + } else { + labelsWithFrequencies.put(msg, 1L); + } + } + // select the most frequent label: if two or more labels have the + // same frequency, + // the node adopts the label with the highest value + for (Entry entry : labelsWithFrequencies.entrySet()) { + if (entry.getValue() == maxFrequency) { + // check the label value to break ties + if (entry.getKey() > mostFrequentLabel) { + mostFrequentLabel = entry.getKey(); + } + } else if (entry.getValue() > maxFrequency) { + maxFrequency = entry.getValue(); + mostFrequentLabel = entry.getKey(); + } + } + + // set the new vertex value + setNewVertexValue(mostFrequentLabel); + } + } + + /** + * Sends the vertex label to all out-neighbors + */ + public static final class SendNewLabelToNeighbors extends MessagingFunction { + + public void sendMessages(Vertex vertex) { + sendMessageToAllNeighbors(vertex.getValue()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java deleted file mode 100644 index 0b0f4fc..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; -import org.apache.flink.types.NullValue; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -/** - * An implementation of the label propagation algorithm. The iterative algorithm - * detects communities by propagating labels. In each iteration, a vertex adopts - * the label that is most frequent among its neighbors' labels. Labels are - * represented by Longs and we assume a total ordering among them, in order to - * break ties. The algorithm converges when no vertex changes its value or the - * maximum number of iterations have been reached. Note that different - * initializations might lead to different results. - * - */ -@SuppressWarnings("serial") - -public class LabelPropagationAlgorithm & Serializable> - implements GraphAlgorithm { - - private final int maxIterations; - - public LabelPropagationAlgorithm(int maxIterations) { - this.maxIterations = maxIterations; - } - - @Override - public Graph run(Graph input) { - - // iteratively adopt the most frequent label among the neighbors - // of each vertex - return input.runVertexCentricIteration(new UpdateVertexLabel(), new SendNewLabelToNeighbors(), - maxIterations); - } - - /** - * Function that updates the value of a vertex by adopting the most frequent - * label among its in-neighbors - */ - public static final class UpdateVertexLabel extends VertexUpdateFunction { - - public void updateVertex(Vertex vertex, - MessageIterator inMessages) { - Map labelsWithFrequencies = new HashMap(); - - long maxFrequency = 1; - long mostFrequentLabel = vertex.getValue(); - - // store the labels with their frequencies - for (Long msg : inMessages) { - if (labelsWithFrequencies.containsKey(msg)) { - long currentFreq = labelsWithFrequencies.get(msg); - labelsWithFrequencies.put(msg, currentFreq + 1); - } else { - labelsWithFrequencies.put(msg, 1L); - } - } - // select the most frequent label: if two or more labels have the - // same frequency, - // the node adopts the label with the highest value - for (Entry entry : labelsWithFrequencies.entrySet()) { - if (entry.getValue() == maxFrequency) { - // check the label value to break ties - if (entry.getKey() > mostFrequentLabel) { - mostFrequentLabel = entry.getKey(); - } - } else if (entry.getValue() > maxFrequency) { - maxFrequency = entry.getValue(); - mostFrequentLabel = entry.getKey(); - } - } - - // set the new vertex value - setNewVertexValue(mostFrequentLabel); - } - } - - /** - * Sends the vertex label to all out-neighbors - */ - public static final class SendNewLabelToNeighbors extends MessagingFunction { - - public void sendMessages(Vertex vertex) { - sendMessageToAllNeighbors(vertex.getValue()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java new file mode 100644 index 0000000..03cb740 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration. + */ +public class PageRank implements GraphAlgorithm { + + private double beta; + private int maxIterations; + + public PageRank(double beta, int maxIterations) { + this.beta = beta; + this.maxIterations = maxIterations; + } + + @Override + public Graph run(Graph network) throws Exception { + + final long numberOfVertices = network.numberOfVertices(); + + return network.runVertexCentricIteration(new VertexRankUpdater(beta, numberOfVertices), + new RankMessenger(numberOfVertices), maxIterations); + } + + /** + * Function that updates the rank of a vertex by summing up the partial + * ranks from all incoming messages and then applying the dampening formula. + */ + @SuppressWarnings("serial") + public static final class VertexRankUpdater extends VertexUpdateFunction { + + private final double beta; + private final long numVertices; + + public VertexRankUpdater(double beta, long numberOfVertices) { + this.beta = beta; + this.numVertices = numberOfVertices; + } + + @Override + public void updateVertex(Vertex vertex, MessageIterator inMessages) { + double rankSum = 0.0; + for (double msg : inMessages) { + rankSum += msg; + } + + // apply the dampening factor / random jump + double newRank = (beta * rankSum) + (1 - beta) / numVertices; + setNewVertexValue(newRank); + } + } + + /** + * Distributes the rank of a vertex among all target vertices according to + * the transition probability, which is associated with an edge as the edge + * value. + */ + @SuppressWarnings("serial") + public static final class RankMessenger extends MessagingFunction { + + private final long numVertices; + + public RankMessenger(long numberOfVertices) { + this.numVertices = numberOfVertices; + } + + @Override + public void sendMessages(Vertex vertex) { + if (getSuperstepNumber() == 1) { + // initialize vertex ranks + vertex.setValue(new Double(1.0 / numVertices)); + } + + for (Edge edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java deleted file mode 100644 index f63fb0d..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; - -import java.io.Serializable; - - -public class PageRankAlgorithm & Serializable> implements - GraphAlgorithm { - - private double beta; - private int maxIterations; - - public PageRankAlgorithm(double beta, int maxIterations) { - this.beta = beta; - this.maxIterations = maxIterations; - } - - @Override - public Graph run(Graph network) throws Exception { - - final long numberOfVertices = network.numberOfVertices(); - return network.runVertexCentricIteration(new VertexRankUpdater(beta, numberOfVertices), new RankMessenger(numberOfVertices), - maxIterations); - } - - /** - * Function that updates the rank of a vertex by summing up the partial - * ranks from all incoming messages and then applying the dampening formula. - */ - @SuppressWarnings("serial") - public static final class VertexRankUpdater extends VertexUpdateFunction { - - private final double beta; - private final long numVertices; - - public VertexRankUpdater(double beta, long numberOfVertices) { - this.beta = beta; - this.numVertices = numberOfVertices; - } - - @Override - public void updateVertex(Vertex vertex, MessageIterator inMessages) { - double rankSum = 0.0; - for (double msg : inMessages) { - rankSum += msg; - } - - // apply the dampening factor / random jump - double newRank = (beta * rankSum) + (1 - beta) / numVertices; - setNewVertexValue(newRank); - } - } - - /** - * Distributes the rank of a vertex among all target vertices according to - * the transition probability, which is associated with an edge as the edge - * value. - */ - @SuppressWarnings("serial") - public static final class RankMessenger extends MessagingFunction { - - private final long numVertices; - - public RankMessenger(long numberOfVertices) { - this.numVertices = numberOfVertices; - } - - @Override - public void sendMessages(Vertex vertex) { - if (getSuperstepNumber() == 1) { - // initialize vertex ranks - vertex.setValue(new Double(1.0 / numVertices)); - } - - for (Edge edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java new file mode 100644 index 0000000..97ea000 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration. + */ +@SuppressWarnings("serial") +public class SingleSourceShortestPaths implements GraphAlgorithm { + + private final K srcVertexId; + private final Integer maxIterations; + + public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) { + this.srcVertexId = srcVertexId; + this.maxIterations = maxIterations; + } + + @Override + public Graph run(Graph input) { + + return input.mapVertices(new InitVerticesMapper(srcVertexId)) + .runVertexCentricIteration(new VertexDistanceUpdater(), new MinDistanceMessenger(), + maxIterations); + } + + public static final class InitVerticesMapper implements MapFunction, Double> { + + private K srcVertexId; + + public InitVerticesMapper(K srcId) { + this.srcVertexId = srcId; + } + + public Double map(Vertex value) { + if (value.f0.equals(srcVertexId)) { + return 0.0; + } else { + return Double.MAX_VALUE; + } + } + } + + /** + * Function that updates the value of a vertex by picking the minimum + * distance from all incoming messages. + * + * @param + */ + public static final class VertexDistanceUpdater extends VertexUpdateFunction { + + @Override + public void updateVertex(Vertex vertex, + MessageIterator inMessages) { + + Double minDistance = Double.MAX_VALUE; + + for (double msg : inMessages) { + if (msg < minDistance) { + minDistance = msg; + } + } + + if (vertex.getValue() > minDistance) { + setNewVertexValue(minDistance); + } + } + } + + /** + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. + * + * @param + */ + public static final class MinDistanceMessenger extends MessagingFunction { + + @Override + public void sendMessages(Vertex vertex) + throws Exception { + for (Edge edge : getEdges()) { + sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java deleted file mode 100644 index e78ae3e..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; - -import java.io.Serializable; - -@SuppressWarnings("serial") -public class SingleSourceShortestPathsAlgorithm & Serializable> - implements GraphAlgorithm { - - private final K srcVertexId; - private final Integer maxIterations; - - public SingleSourceShortestPathsAlgorithm(K srcVertexId, Integer maxIterations) { - this.srcVertexId = srcVertexId; - this.maxIterations = maxIterations; - } - - @Override - public Graph run(Graph input) { - - return input.mapVertices(new InitVerticesMapper(srcVertexId)) - .runVertexCentricIteration(new VertexDistanceUpdater(), new MinDistanceMessenger(), - maxIterations); - } - - public static final class InitVerticesMapper implements MapFunction, Double> { - - private K srcVertexId; - - public InitVerticesMapper(K srcId) { - this.srcVertexId = srcId; - } - - public Double map(Vertex value) { - if (value.f0.equals(srcVertexId)) { - return 0.0; - } else { - return Double.MAX_VALUE; - } - } - } - - /** - * Function that updates the value of a vertex by picking the minimum - * distance from all incoming messages. - * - * @param - */ - public static final class VertexDistanceUpdater extends VertexUpdateFunction { - - @Override - public void updateVertex(Vertex vertex, - MessageIterator inMessages) { - - Double minDistance = Double.MAX_VALUE; - - for (double msg : inMessages) { - if (msg < minDistance) { - minDistance = msg; - } - } - - if (vertex.getValue() > minDistance) { - setNewVertexValue(minDistance); - } - } - } - - /** - * Distributes the minimum distance associated with a given vertex among all - * the target vertices summed up with the edge's value. - * - * @param - */ - public static final class MinDistanceMessenger extends MessagingFunction { - - @Override - public void sendMessages(Vertex vertex) - throws Exception { - for (Edge edge : getEdges()) { - sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue()); - } - } - } -} \ No newline at end of file