flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [07/50] [abbrv] flink git commit: [FLINK-1201] [gelly] changed mapVertices to return a Graph and simplified SSSP
Date Wed, 11 Feb 2015 10:49:09 GMT
[FLINK-1201] [gelly] changed mapVertices to return a Graph and simplified SSSP


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f41b9efa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f41b9efa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f41b9efa

Branch: refs/heads/master
Commit: f41b9efadb1d654a8f7a0982912e317df22daf13
Parents: 2815df3
Author: vasia <vasilikikalavri@gmail.com>
Authored: Sat Dec 20 12:04:03 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 10:46:13 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 21 +++----
 .../SingleSourceShortestPathsExample.java       | 41 ++----------
 .../flink/graph/example/utils/ExampleUtils.java | 28 +++++++++
 .../library/SingleSourceShortestPaths.java      | 65 +++++++-------------
 .../flink/graph/test/TestMapVertices.java       | 53 ++++++++--------
 5 files changed, 91 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f41b9efa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index c1bfeb3..4f295ea 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -111,36 +111,35 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
     /**
      * Apply a function to the attribute of each vertex in the graph.
      * @param mapper
-     * @return
+     * @return a new graph
      */
-    public <NV extends Serializable> DataSet<Vertex<K, NV>> mapVertices(final
MapFunction<VV, NV> mapper) {
-        return vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper));
+    public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K,
VV>, NV> mapper) {
+    	DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K,
VV, NV>(mapper)); 
+        return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context);
     }
     
     private static final class ApplyMapperToVertexWithType<K extends Comparable<K>
& Serializable, 
     	VV extends Serializable, NV extends Serializable> implements MapFunction
 		<Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, NV>>
{
 	
-		private MapFunction<VV, NV> innerMapper;
+		private MapFunction<Vertex<K, VV>, NV> innerMapper;
 		
-		public ApplyMapperToVertexWithType(MapFunction<VV, NV> theMapper) {
+		public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper)
{
 			this.innerMapper = theMapper;
 		}
 		
 		public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
-			return new Vertex<K, NV>(value.f0, innerMapper.map(value.f1));
+			return new Vertex<K, NV>(value.f0, innerMapper.map(value));
 		}
 	
 		@Override
 		public TypeInformation<Vertex<K, NV>> getProducedType() {
-			@SuppressWarnings("unchecked")
-			TypeInformation<NV> newVertexValueType = TypeExtractor.getMapReturnTypes(innerMapper,

-					(TypeInformation<VV>)vertexValueType);
-			
+			TypeInformation<Vertex<K, VV>> vertextypeInfo = new TupleTypeInfo<Vertex<K,
VV>>(keyType, vertexValueType);
+			TypeInformation<NV> newVertexValueType = TypeExtractor.getMapReturnTypes(innerMapper,
vertextypeInfo);			
 			return new TupleTypeInfo<Vertex<K, NV>>(keyType, newVertexValueType);
 		}
     }
-    
+
     /**
      * Apply a function to the attribute of each edge in the graph.
      * @param mapper

http://git-wip-us.apache.org/repos/asf/flink/blob/f41b9efa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index 4b15508..75e33dc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -3,14 +3,13 @@ package flink.graphs.example;
 import flink.graphs.Edge;
 import flink.graphs.Graph;
 import flink.graphs.Vertex;
+import flink.graphs.example.utils.ExampleUtils;
 import flink.graphs.library.SingleSourceShortestPaths;
+
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class SingleSourceShortestPathsExample implements ProgramDescription {
 
     private static int maxIterations = 5;
@@ -19,54 +18,24 @@ public class SingleSourceShortestPathsExample implements ProgramDescription
{
 
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-        DataSet<Vertex<Long,Double>> vertices = getLongDoubleVertexData(env);
+        DataSet<Vertex<Long, Double>> vertices = ExampleUtils.getLongDoubleVertexData(env);
 
-        DataSet<Edge<Long,Double>> edges = getLongDoubleEdgeData(env);
+        DataSet<Edge<Long, Double>> edges = ExampleUtils.getLongDoubleEdgeData(env);
 
         Long srcVertexId = 1L;
 
         Graph<Long, Double, Double> graph = Graph.create(vertices, edges, env);
 
         DataSet<Vertex<Long,Double>> singleSourceShortestPaths =
-                graph.run(new SingleSourceShortestPaths(srcVertexId, maxIterations)).getVertices();
+                graph.run(new SingleSourceShortestPaths<Long>(srcVertexId, maxIterations)).getVertices();
 
         singleSourceShortestPaths.print();
 
         env.execute();
     }
 
-
     @Override
     public String getDescription() {
         return "Single Source Shortest Paths";
     }
-
-    @SuppressWarnings("serial")
-    public static final DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
-            ExecutionEnvironment env) {
-        List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long,
Double>>();
-        vertices.add(new Vertex(1L, 1.0));
-        vertices.add(new Vertex(2L, 2.0));
-        vertices.add(new Vertex(3L, 3.0));
-        vertices.add(new Vertex(4L, 4.0));
-        vertices.add(new Vertex(5L, 5.0));
-
-        return env.fromCollection(vertices);
-    }
-
-    @SuppressWarnings("serial")
-    public static final DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
-            ExecutionEnvironment env) {
-        List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
-        edges.add(new Edge(1L, 2L, 12.0));
-        edges.add(new Edge(1L, 3L, 13.0));
-        edges.add(new Edge(2L, 3L, 23.0));
-        edges.add(new Edge(3L, 4L, 34.0));
-        edges.add(new Edge(3L, 5L, 35.0));
-        edges.add(new Edge(4L, 5L, 45.0));
-        edges.add(new Edge(5L, 1L, 51.0));
-
-        return env.fromCollection(edges);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f41b9efa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
index c4ff43e..4588230 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
@@ -1,6 +1,8 @@
 package flink.graphs.example.utils;
 
 import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -103,5 +105,31 @@ public class ExampleUtils {
 	                    }
 	                });
 	}
+
+	public static final DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
+			ExecutionEnvironment env) {
+		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
+		vertices.add(new Vertex<Long, Double>(1L, 1.0));
+		vertices.add(new Vertex<Long, Double>(2L, 2.0));
+		vertices.add(new Vertex<Long, Double>(3L, 3.0));
+		vertices.add(new Vertex<Long, Double>(4L, 4.0));
+		vertices.add(new Vertex<Long, Double>(5L, 5.0));
+
+		return env.fromCollection(vertices);
+	}
+	
+	public static final DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
+		edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
+		edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
+		
+		return env.fromCollection(edges);
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f41b9efa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 6245091..632233c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -1,11 +1,8 @@
 package flink.graphs.library;
 
 import flink.graphs.*;
-import org.apache.flink.api.common.functions.FilterFunction;
+
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.spargel.java.MessageIterator;
 import org.apache.flink.spargel.java.MessagingFunction;
 import org.apache.flink.spargel.java.OutgoingEdge;
@@ -13,6 +10,7 @@ import org.apache.flink.spargel.java.VertexUpdateFunction;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
implements GraphAlgorithm<K, Double, Double> {
 
     private final K srcVertexId;
@@ -25,29 +23,39 @@ public class SingleSourceShortestPaths<K extends Comparable<K>
& Serializable> i
 
     @Override
     public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
-        DataSet<Vertex<K, Double>> sourceVertex = input.getVertices().filter(
-                new SelectVertex<K>(srcVertexId));
-
-        DataSet<Vertex<K, Double>> verticesWithInitialDistance = sourceVertex.cross(input.getVertices())
-                .map(new InitSrcVertex<K>());
-
-        Graph<K, Double, Double> graph = Graph.create(verticesWithInitialDistance,
input.getEdges(),
-                ExecutionEnvironment.getExecutionEnvironment());
 
-        return graph.runVertexCentricIteration(
+    	return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+    			.runVertexCentricIteration(
                 new VertexDistanceUpdater<K>(),
                 new MinDistanceMessenger<K>(),
                 maxIterations
         );
     }
 
+    public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>

+    	implements MapFunction<Vertex<K,Double>, Double> {
+
+    	private K srcVertexId;
+
+    	public InitVerticesMapper(K srcId) {
+    		this.srcVertexId = srcId;
+    	}
+    		
+		public Double map(Vertex<K, Double> value) {
+			if (value.f0.equals(srcVertexId)) {
+				return 0.0;
+			}
+			else {
+				return Double.MAX_VALUE;
+			}
+		}
+    }
 
     /**
      * Function that updates the value of a vertex by picking the minimum distance from all
incoming messages.
      *
      * @param <K>
      */
-    @SuppressWarnings("serial")
     public static final class VertexDistanceUpdater<K extends Comparable<K> &
Serializable>
             extends VertexUpdateFunction<K, Double, Double> {
 
@@ -76,7 +84,6 @@ public class SingleSourceShortestPaths<K extends Comparable<K>
& Serializable> i
      *
      * @param <K>
      */
-    @SuppressWarnings("serial")
     public static final class MinDistanceMessenger<K extends Comparable<K> &
Serializable>
             extends MessagingFunction<K, Double, Double, Double> {
 
@@ -87,32 +94,4 @@ public class SingleSourceShortestPaths<K extends Comparable<K>
& Serializable> i
             }
         }
     }
-
-    private static final class SelectVertex<K extends Comparable<K> & Serializable>
-            implements FilterFunction<Vertex<K, Double>> {
-        private K id;
-
-        public SelectVertex(K id) {
-            this.id = id;
-        }
-
-        @Override
-        public boolean filter(Vertex<K, Double> vertex) throws Exception {
-            return vertex.getId().equals(id);
-        }
-    }
-
-    private static final class InitSrcVertex<K extends Comparable<K> & Serializable>
-            implements MapFunction<Tuple2<Vertex<K, Double>, Vertex<K,Double>>,
Vertex<K,Double>> {
-
-        @Override
-        public Vertex<K, Double> map(Tuple2<Vertex<K, Double>, Vertex<K,
Double>> vertexVertexTuple2) throws Exception {
-            if(vertexVertexTuple2.f0.f0.equals(vertexVertexTuple2.f1.f0)) {
-                return new Vertex<>(vertexVertexTuple2.f0.f0, 0.0);
-            } else {
-                return new Vertex<>(vertexVertexTuple2.f1.f0, Double.MAX_VALUE);
-            }
-        }
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f41b9efa/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
index a08cd85..896d73d 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java
@@ -75,11 +75,11 @@ public class TestMapVertices extends JavaProgramTestBase {
 				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env));
 				
-				DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new MapFunction<Long,
Long>() {
-					public Long map(Long value) throws Exception {
-						return value+1;
+				DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long,
Long>, Long>() {
+					public Long map(Vertex<Long, Long> value) throws Exception {
+						return value.getValue()+1;
 					}
-				});
+				}).getVertices();
 				
 				mappedVertices.writeAsCsv(resultPath);
 				env.execute();
@@ -98,31 +98,30 @@ public class TestMapVertices extends JavaProgramTestBase {
 				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env));
 				
-				DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new MapFunction<Long,
String>() {
-					public String map(Long value) throws Exception {
+				DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long,
Long>, String>() {
+					public String map(Vertex<Long, Long> vertex) throws Exception {
 						String stringValue;
-						if (value == 1) {
+						if (vertex.getValue() == 1) {
 							stringValue = "one";
 						}
-						else if (value == 2) {
+						else if (vertex.getValue() == 2) {
 							stringValue = "two";
 						}
-						else if (value == 3) {
+						else if (vertex.getValue() == 3) {
 							stringValue = "three";
 						}
-						else if (value == 4) {
+						else if (vertex.getValue() == 4) {
 							stringValue = "four";
 						}
-						else if (value == 5) {
+						else if (vertex.getValue() == 5) {
 							stringValue = "five";
 						}
 						else {
 							stringValue = "";
 						}
-						
 						return stringValue;
 					}
-				});
+				}).getVertices();
 				
 				mappedVertices.writeAsCsv(resultPath);
 				env.execute();
@@ -141,13 +140,13 @@ public class TestMapVertices extends JavaProgramTestBase {
 				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env));
 				
-				DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new
MapFunction<Long, Tuple1<Long>>() {
-					public Tuple1<Long> map(Long value) throws Exception {
+				DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new
MapFunction<Vertex<Long, Long>, Tuple1<Long>>() {
+					public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
 						Tuple1<Long> tupleValue = new Tuple1<Long>();
-						tupleValue.setFields(value);
+						tupleValue.setFields(vertex.getValue());
 						return tupleValue;
 					}
-				});
+				}).getVertices();
 				
 				mappedVertices.writeAsCsv(resultPath);
 				env.execute();
@@ -166,13 +165,13 @@ public class TestMapVertices extends JavaProgramTestBase {
 				Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
 						TestGraphUtils.getLongLongEdgeData(env));
 				
-				DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new
MapFunction<Long, DummyCustomType>() {
-					public DummyCustomType map(Long value) throws Exception {
+				DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new
MapFunction<Vertex<Long, Long>, DummyCustomType>() {
+					public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
 						DummyCustomType dummyValue = new DummyCustomType();
-						dummyValue.setIntField(value.intValue());						
+						dummyValue.setIntField(vertex.getValue().intValue());						
 						return dummyValue;
 					}
-				});
+				}).getVertices();
 				
 				mappedVertices.writeAsCsv(resultPath);
 				env.execute();
@@ -192,14 +191,14 @@ public class TestMapVertices extends JavaProgramTestBase {
 						TestGraphUtils.getLongLongEdgeData(env));
 				
 				DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices
= graph.mapVertices(
-						new MapFunction<Long, DummyCustomParameterizedType<Double>>() {
-					public DummyCustomParameterizedType<Double> map(Long value) throws Exception {
+						new MapFunction<Vertex<Long, Long>, DummyCustomParameterizedType<Double>>()
{
+					public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex)
throws Exception {
 						DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
-						dummyValue.setIntField(value.intValue());
-						dummyValue.setTField(new Double(value));						
+						dummyValue.setIntField(vertex.getValue().intValue());
+						dummyValue.setTField(new Double(vertex.getValue()));						
 						return dummyValue;
 					}
-				});
+				}).getVertices();
 				
 				mappedVertices.writeAsCsv(resultPath);
 				env.execute();
@@ -214,5 +213,5 @@ public class TestMapVertices extends JavaProgramTestBase {
 			}
 		}
 	}
-	
+
 }


Mime
View raw message