flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [20/50] [abbrv] flink git commit: [FLINK-1201] [gelly] changed pageRank example to use joinWithEdgesOnSource
Date Wed, 11 Feb 2015 10:49:22 GMT
[FLINK-1201] [gelly] changed pageRank example to use joinWithEdgesOnSource


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/753c71ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/753c71ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/753c71ae

Branch: refs/heads/master
Commit: 753c71ae4a40d1414c65319e783150de9cd4177e
Parents: e0c10ec
Author: vasia <vasilikikalavri@gmail.com>
Authored: Sat Jan 10 18:32:01 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 90 +++++++++-----------
 .../flink/graph/example/PageRankExample.java    | 18 +++-
 .../flink/graph/test/TestJoinWithEdges.java     |  3 +-
 3 files changed, 58 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 71a701b..51b8c30 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -88,7 +88,6 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @return
  	 */
 	public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) {
-
 		return validator.validate(this);
 	}
 
@@ -184,39 +183,39 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @return - a new graph where the vertex values have been updated.
 	 */
 	public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>>
inputDataSet,
-												 final MapFunction<Tuple2<VV, T>, VV> mapper) {
+			final MapFunction<Tuple2<VV, T>, VV> mapper) {
 
 		DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
 				.coGroup(inputDataSet).where(0).equalTo(0)
 				.with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper));
-
 		return Graph.create(resultedVertices, this.getEdges(), this.getContext());
 	}
 
 	private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> &
Serializable,
-			VV extends Serializable, T>
-			implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>>
{
+			VV extends Serializable, T>	implements CoGroupFunction<Vertex<K, VV>, Tuple2<K,
T>, Vertex<K, VV>> {
 
 		private MapFunction<Tuple2<VV, T>, VV> mapper;
-
 		public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) {
 			this.mapper = mapper;
 		}
 
 		@Override
-		public void coGroup(Iterable<Vertex<K, VV>> iterableDS1, Iterable<Tuple2<K,
T>> iterableDS2,
+		public void coGroup(Iterable<Vertex<K, VV>> vertices, Iterable<Tuple2<K,
T>> input,
 							Collector<Vertex<K, VV>> collector) throws Exception {
 
-			Iterator<Vertex<K, VV>> iteratorDS1 = iterableDS1.iterator();
-			Iterator<Tuple2<K, T>> iteratorDS2 = iterableDS2.iterator();
+			final Iterator<Vertex<K, VV>> vertexIterator = vertices.iterator();
+			final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
 
-			if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) {
-				Tuple2<K, T> iteratorDS2Next = iteratorDS2.next();
+			if (vertexIterator.hasNext()) {
+				if(inputIterator.hasNext()) {
+					final Tuple2<K, T> inputNext = inputIterator.next();
 
-				collector.collect(new Vertex<K, VV>(iteratorDS2Next.f0, mapper
-						.map(new Tuple2<VV, T>(iteratorDS1.next().f1, iteratorDS2Next.f1))));
-			} else if(iteratorDS1.hasNext()) {
-				collector.collect(iteratorDS1.next());
+					collector.collect(new Vertex<K, VV>(inputNext.f0, mapper
+							.map(new Tuple2<VV, T>(vertexIterator.next().f1, inputNext.f1))));
+				} else {
+					collector.collect(vertexIterator.next());
+				}
+				
 			}
 		}
 	}
@@ -229,13 +228,12 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @param <T>
 	 * @return - a new graph where the edge values have been updated.
 	 */
-	public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>>
inputDataSet,
-											  final MapFunction<Tuple2<EV, T>, EV> mapper) {
+	public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>>
inputDataSet, 
+			final MapFunction<Tuple2<EV, T>, EV> mapper) {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(0,1).equalTo(0,1)
 				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper));
-
 		return Graph.create(this.getVertices(), resultedEdges, this.getContext());
 	}
 
@@ -244,27 +242,27 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 			implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>>
{
 
 		private MapFunction<Tuple2<EV, T>, EV> mapper;
-
 		public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) {
 			this.mapper = mapper;
 		}
 
 		@Override
-		public void coGroup(Iterable<Edge<K, EV>> iterableDS1,
-							Iterable<Tuple3<K, K, T>> iterableDS2,
+		public void coGroup(Iterable<Edge<K, EV>> edges,
+							Iterable<Tuple3<K, K, T>> input,
 							Collector<Edge<K, EV>> collector) throws Exception {
 
-			Iterator<Edge<K, EV>> iteratorDS1 = iterableDS1.iterator();
-			Iterator<Tuple3<K, K, T>> iteratorDS2 = iterableDS2.iterator();
+			final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
+			final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator();
 
-			if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) {
-				Tuple3<K, K, T> iteratorDS2Next = iteratorDS2.next();
+			if (edgesIterator.hasNext()) {
+				if(inputIterator.hasNext()) {
+					final Tuple3<K, K, T> inputNext = inputIterator.next();
 
-				collector.collect(new Edge<K, EV>(iteratorDS2Next.f0, iteratorDS2Next.f1, mapper
-						.map(new Tuple2<EV, T>(iteratorDS1.next().f2, iteratorDS2Next.f2))));
-
-			} else if(iteratorDS1.hasNext()) {
-				collector.collect(iteratorDS1.next());
+					collector.collect(new Edge<K, EV>(inputNext.f0, inputNext.f1, mapper
+							.map(new Tuple2<EV, T>(edgesIterator.next().f2, inputNext.f2))));
+				} else {
+					collector.collect(edgesIterator.next());
+				}
 			}
 		}
 	}
@@ -279,7 +277,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @return - a new graph where the edge values have been updated.
 	 */
 	public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>>
inputDataSet,
-												 final MapFunction<Tuple2<EV, T>, EV> mapper) {
+			final MapFunction<Tuple2<EV, T>, EV> mapper) {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(0).equalTo(0)
@@ -289,37 +287,33 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	}
 
 	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K>
& Serializable,
-			EV extends Serializable, T>
-			implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>>
{
+			EV extends Serializable, T>	implements CoGroupFunction<Edge<K, EV>, Tuple2<K,
T>, Edge<K, EV>> {
 
 		private MapFunction<Tuple2<EV, T>, EV> mapper;
-
 		public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(MapFunction<Tuple2<EV, T>,
EV> mapper) {
 			this.mapper = mapper;
 		}
 
-
 		@Override
-		public void coGroup(Iterable<Edge<K, EV>> iterableDS1,
-							Iterable<Tuple2<K, T>> iterableDS2,
+		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple2<K, T>>
input,
 							Collector<Edge<K, EV>> collector) throws Exception {
 
-			Iterator<Edge<K, EV>> iteratorDS1 = iterableDS1.iterator();
-			Iterator<Tuple2<K, T>> iteratorDS2 = iterableDS2.iterator();
+			final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
+			final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
 
-			if(iteratorDS2.hasNext()) {
-				Tuple2<K, T> iteratorDS2Next = iteratorDS2.next();
+			if(inputIterator.hasNext()) {
+				final Tuple2<K, T> inputNext = inputIterator.next();
 
-				while(iteratorDS1.hasNext()) {
-					Edge<K, EV> iteratorDS1Next = iteratorDS1.next();
+				while(edgesIterator.hasNext()) {
+					Edge<K, EV> edgesNext = edgesIterator.next();
 
-					collector.collect(new Edge<K, EV>(iteratorDS1Next.f0, iteratorDS1Next.f1, mapper
-							.map(new Tuple2<EV, T>(iteratorDS1Next.f2, iteratorDS2Next.f1))));
+					collector.collect(new Edge<K, EV>(edgesNext.f0, edgesNext.f1, mapper
+							.map(new Tuple2<EV, T>(edgesNext.f2, inputNext.f1))));
 				}
 
 			} else {
-				while(iteratorDS1.hasNext()) {
-					collector.collect(iteratorDS1.next());
+				while(edgesIterator.hasNext()) {
+					collector.collect(edgesIterator.next());
 				}
 			}
 		}
@@ -335,7 +329,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @return - a new graph where the edge values have been updated.
 	 */
 	public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>>
inputDataSet,
-													  final MapFunction<Tuple2<EV, T>, EV> mapper) {
+			final MapFunction<Tuple2<EV, T>, EV> mapper) {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(1).equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
index 0fc8084..e3f815a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
@@ -8,11 +8,13 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.*;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
 public class PageRankExample implements ProgramDescription {
 
-    public static void main (String [] args) throws Exception {
+    @SuppressWarnings("serial")
+	public static void main (String [] args) throws Exception {
 
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -21,9 +23,19 @@ public class PageRankExample implements ProgramDescription {
         DataSet<Edge<Long,Double>> links = getLinksDataSet(env);
 
         Graph<Long, Double, Double> network = new Graph<Long, Double, Double>(pages,
links, env);
+        
+        DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
+        
+        // assign the transition probabilities as the edge weights
+        Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,

+        		new MapFunction<Tuple2<Double, Long>, Double>() {
+					public Double map(Tuple2<Double, Long> value) {
+						return value.f0 / value.f1;
+					}
+				});
 
         DataSet<Vertex<Long,Double>> pageRanks =
-                network.run(new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations)).getVertices();
+        		networkWithWeights.run(new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations)).getVertices();
 
         pageRanks.print();
 
@@ -60,7 +72,7 @@ public class PageRankExample implements ProgramDescription {
                             int numOutEdges = (int) (Math.random() * (numPages / 2));
                             for (int i = 0; i < numOutEdges; i++) {
                                 long target = (long) (Math.random() * numPages) + 1;
-                                out.collect(new Edge<Long, Double>(key, target, 1.0
/ numOutEdges));
+                                out.collect(new Edge<Long, Double>(key, target, 1.0));
                             }
                         }
                     });

http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
index 711cd61..7375d0c 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
@@ -580,5 +580,4 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
             }
         }
     }
-}
-
+}
\ No newline at end of file


Mime
View raw message