flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [4/4] flink git commit: [gelly] made the number of vertices an optional parameter of PageRank; added the edge weight initialization to the library methods
Date Sat, 22 Aug 2015 19:39:35 GMT
[gelly] made the number of vertices an optional parameter of PageRank; added the edge weight
initialization to the library methods


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ae84273
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ae84273
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ae84273

Branch: refs/heads/master
Commit: 5ae84273ce60899ba118b8d21b40587a71515f9b
Parents: 8f35988
Author: vasia <vasia@apache.org>
Authored: Sat Aug 8 13:36:47 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Sat Aug 22 20:47:00 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java |  1 -
 .../apache/flink/graph/library/GSAPageRank.java | 37 ++++++++++++++-
 .../apache/flink/graph/library/PageRank.java    | 44 +++++++++++++++++-
 .../graph/test/library/PageRankITCase.java      | 47 +++++++++++---------
 4 files changed, 104 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ae84273/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 8552c01..ff0ec24 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1145,7 +1145,6 @@ public class Graph<K, VV, EV> {
 	 * @return the new graph containing the existing vertices and edges plus the
 	 *         newly added edge
 	 */
-	@SuppressWarnings("unchecked")
 	public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target,
EV edgeValue) {
 		Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target),
 				Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),

http://git-wip-us.apache.org/repos/asf/flink/blob/5ae84273/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 4299381..4adaaa9 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.graph.library;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.gsa.ApplyFunction;
@@ -27,23 +30,46 @@ import org.apache.flink.graph.gsa.SumFunction;
 
 /**
  * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ * If the number of vertices of the input graph is known, it should be provided as a parameter
+ * to speed up computation. Otherwise, the algorithm will first execute a job to count the
vertices.
+ * 
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
  */
 public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double> {
 
 	private double beta;
 	private int maxIterations;
+	private long numberOfVertices;
 
+	/**
+	 * @param beta the damping factor
+	 * @param maxIterations the maximum number of iterations
+	 */
 	public GSAPageRank(double beta, int maxIterations) {
 		this.beta = beta;
 		this.maxIterations = maxIterations;
 	}
 
+	public GSAPageRank(double beta, long numVertices, int maxIterations) {
+		this.beta = beta;
+		this.numberOfVertices = numVertices;
+		this.maxIterations = maxIterations;
+	}
+
 	@Override
 	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws
Exception {
 
-		final long numberOfVertices = network.numberOfVertices();
+		if (numberOfVertices == 0) {
+			numberOfVertices = network.numberOfVertices();
+		}
+
+		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
 
-		return network.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
+		Graph<K, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices),
new SumRanks(),
 				new UpdateRanks<K>(beta, numberOfVertices), maxIterations);
 	}
 
@@ -97,4 +123,11 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double,
Double> {
 			setResult((1-beta)/numVertices + beta * rankSum);
 		}
 	}
+
+	@SuppressWarnings("serial")
+	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double,
Long>, Double> {
+		public Double map(Tuple2<Double, Long> value) {
+			return value.f0 / value.f1;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ae84273/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 03cb740..93b10eb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.graph.library;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -28,23 +31,52 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 /**
  * This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ * If the number of vertices of the input graph is known, it should be provided as a parameter
+ * to speed up computation. Otherwise, the algorithm will first execute a job to count the
vertices.
+ * 
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
  */
 public class PageRank<K> implements	GraphAlgorithm<K, Double, Double> {
 
 	private double beta;
 	private int maxIterations;
+	private long numberOfVertices;
 
+	/**
+	 * @param beta the damping factor
+	 * @param maxIterations the maximum number of iterations
+	 */
 	public PageRank(double beta, int maxIterations) {
 		this.beta = beta;
 		this.maxIterations = maxIterations;
+		this.numberOfVertices = 0;
+	}
+
+	/**
+	 * @param beta the damping factor
+	 * @param maxIterations the maximum number of iterations
+	 * @param numVertices the number of vertices in the input
+	 */
+	public PageRank(double beta, long numVertices, int maxIterations) {
+		this.beta = beta;
+		this.maxIterations = maxIterations;
+		this.numberOfVertices = numVertices;
 	}
 
 	@Override
 	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws
Exception {
 
-		final long numberOfVertices = network.numberOfVertices();
+		if (numberOfVertices == 0) {
+			numberOfVertices = network.numberOfVertices();
+		}
+
+		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
 
-		return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
+		Graph<K, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+		return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta,
numberOfVertices),
 				new RankMessenger<K>(numberOfVertices), maxIterations);
 	}
 
@@ -102,4 +134,12 @@ public class PageRank<K> implements	GraphAlgorithm<K, Double,
Double> {
 			}
 		}
 	}
+
+	@SuppressWarnings("serial")
+	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double,
Long>, Double> {
+		public Double map(Tuple2<Double, Long> value) {
+			return value.f0 / value.f1;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ae84273/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
index cc1132d..cc0327f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -22,9 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.PageRankData;
@@ -51,13 +49,8 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
 				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-		
-		DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
 
-		Graph<Long, Double, Double> networkWithWeights = inputGraph
-				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
-
-        List<Vertex<Long, Double>> result = networkWithWeights.run(new PageRank<Long>(0.85,
3))
+        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85,
3))
         		.getVertices().collect();
         
         compareWithDelta(result, expectedResult, 0.01);
@@ -69,13 +62,34 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
 				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-		
-		DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
 
-		Graph<Long, Double, Double> networkWithWeights = inputGraph
-				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85,
3))
+        		.getVertices().collect();
+        
+        compareWithDelta(result, expectedResult, 0.01);
+	}
+
+	@Test
+	public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85,
5, 3))
+        		.getVertices().collect();
+        
+        compareWithDelta(result, expectedResult, 0.01);
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-        List<Vertex<Long, Double>> result = networkWithWeights.run(new GSAPageRank<Long>(0.85,
3))
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85,
5, 3))
         		.getVertices().collect();
         
         compareWithDelta(result, expectedResult, 0.01);
@@ -115,11 +129,4 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 			return 1.0;
 		}
 	}
-
-	@SuppressWarnings("serial")
-	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double,
Long>, Double> {
-		public Double map(Tuple2<Double, Long> value) {
-			return value.f0 / value.f1;
-		}
-	}
 }


Mime
View raw message