flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [21/50] [abbrv] flink git commit: [FLINK-1201] [gelly] Expose the full Vertex and Edge object in filter functions
Date Wed, 11 Feb 2015 10:49:23 GMT
[FLINK-1201] [gelly] Expose the full Vertex and Edge object in filter functions

Expose the full Vertex and Edge object in filter functions to allow filtering
by key value:

- subgraph()
- filterOnVertices()
- filterOnEdges()

fixes #56


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40457c29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40457c29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40457c29

Branch: refs/heads/master
Commit: 40457c29a20ab7f93542900d17e034774db20a12
Parents: d883c3a
Author: Carsten Brandt <mail@cebe.cc>
Authored: Mon Jan 12 20:59:13 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 46 ++----------
 .../flink/graph/test/TestGraphOperations.java   | 77 +++++++++++++-------
 2 files changed, 56 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40457c29/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 51b8c30..1990f26 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -346,10 +346,9 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
      * @param edgeFilter
      * @return
      */
-    public Graph<K, VV, EV> subgraph(FilterFunction<VV> vertexFilter, FilterFunction<EV>
edgeFilter) {
+    public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter,
FilterFunction<Edge<K, EV>> edgeFilter) {
 
-        DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(
-        		new ApplyVertexFilter<K, VV>(vertexFilter));
+        DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter);
 
         DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
         		.where(0).equalTo(0)
@@ -357,8 +356,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
         		.join(filteredVertices).where(1).equalTo(0)
         		.with(new ProjectEdge<K, VV, EV>());
 
-        DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(
-        		new ApplyEdgeFilter<K, EV>(edgeFilter));
+        DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
 
         return new Graph<K, VV, EV>(filteredVertices, filteredEdges, this.context);
     }
@@ -370,10 +368,9 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @param vertexFilter
 	 * @return
 	 */
-	public Graph<K, VV, EV> filterOnVertices(FilterFunction<VV> vertexFilter) {
+	public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>>
vertexFilter) {
 
-		DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(
-				new ApplyVertexFilter<K, VV>(vertexFilter));
+		DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter);
 
 		DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
 				.where(0).equalTo(0)
@@ -391,9 +388,8 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @param edgeFilter
 	 * @return
 	 */
-	public Graph<K, VV, EV> filterOnEdges(FilterFunction<EV> edgeFilter) {
-		DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(
-				new ApplyEdgeFilter<K, EV>(edgeFilter));
+	public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter)
{
+		DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter);
 
 		return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context);
 	}
@@ -408,34 +404,6 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 		}
     }
     
-    private static final class ApplyVertexFilter<K extends Comparable<K> & Serializable,

-    	VV extends Serializable> implements FilterFunction<Vertex<K, VV>> {
-
-    	private FilterFunction<VV> innerFilter;
-    	
-    	public ApplyVertexFilter(FilterFunction<VV> theFilter) {
-    		this.innerFilter = theFilter;
-    	}
-
-		public boolean filter(Vertex<K, VV> value) throws Exception {
-			return innerFilter.filter(value.f1);
-		}
-    	
-    }
-
-    private static final class ApplyEdgeFilter<K extends Comparable<K> & Serializable,

-		EV extends Serializable> implements FilterFunction<Edge<K, EV>> {
-
-    	private FilterFunction<EV> innerFilter;
-    	
-    	public ApplyEdgeFilter(FilterFunction<EV> theFilter) {
-    		this.innerFilter = theFilter;
-    	}    	
-        public boolean filter(Edge<K, EV> value) throws Exception {
-            return innerFilter.filter(value.f2);
-        }
-    }
-
     /**
      * Return the out-degree of all vertices in the graph
      * @return A DataSet of Tuple2<vertexId, outDegree>

http://git-wip-us.apache.org/repos/asf/flink/blob/40457c29/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
index 7dec548..cb285c0 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
@@ -18,7 +18,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class TestGraphOperations extends JavaProgramTestBase {
 
-	private static int NUM_PROGRAMS = 9;
+	private static int NUM_PROGRAMS = 10;
 
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -109,14 +109,14 @@ public class TestGraphOperations extends JavaProgramTestBase {
 
 					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
 							TestGraphUtils.getLongLongEdgeData(env), env);
-					graph.subgraph(new FilterFunction<Long>() {
-									   public boolean filter(Long value) throws Exception {
-										   return (value > 2);
+					graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
+									   public boolean filter(Vertex<Long, Long> vertex) throws Exception {
+										   return (vertex.getValue() > 2);
 									   }
 								   },
-							new FilterFunction<Long>() {
-								public boolean filter(Long value) throws Exception {
-									return (value > 34);
+							new FilterFunction<Edge<Long, Long>>() {
+								public boolean filter(Edge<Long, Long> edge) throws Exception {
+									return (edge.getValue() > 34);
 								}
 							}).getEdges().writeAsCsv(resultPath);
 
@@ -126,6 +126,44 @@ public class TestGraphOperations extends JavaProgramTestBase {
 				}
 				case 4: {
 				/*
+				 * Test filterOnVertices:
+				 */
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+							TestGraphUtils.getLongLongEdgeData(env), env);
+					graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
+						public boolean filter(Vertex<Long, Long> vertex) throws Exception {
+							return (vertex.getValue() > 2);
+						}
+					}).getEdges().writeAsCsv(resultPath);
+
+					env.execute();
+					return  "3,4,34\n" +
+							"3,5,35\n" +
+							"4,5,45\n";
+				}
+				case 5: {
+				/*
+				 * Test filterOnEdges:
+				 */
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+							TestGraphUtils.getLongLongEdgeData(env), env);
+					graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
+						public boolean filter(Edge<Long, Long> edge) throws Exception {
+							return (edge.getValue() > 34);
+						}
+					}).getEdges().writeAsCsv(resultPath);
+
+					env.execute();
+					return "3,5,35\n" +
+							"4,5,45\n" +
+							"5,1,51\n";
+				}
+				case 6: {
+				/*
 				 * Test numberOfVertices()
 				 */
 					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -137,7 +175,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 					env.execute();
 					return "5";
 				}
-				case 5: {
+				case 7: {
 				/*
 				 * Test numberOfEdges()
 				 */
@@ -150,7 +188,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 					env.execute();
 					return "7";
 				}
-				case 6: {
+				case 8: {
 				/*
 				 * Test getVertexIds()
 				 */
@@ -163,7 +201,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 					env.execute();
 					return "1\n2\n3\n4\n5\n";
 				}
-				case 7: {
+				case 9: {
 				/*
 				 * Test getEdgeIds()
 				 */
@@ -179,7 +217,7 @@ public class TestGraphOperations extends JavaProgramTestBase {
 							"3,5\n" + "4,5\n" +
 							"5,1\n";
 				}
-				case 8: {
+				case 10: {
 				/*
 				 * Test union()
 				 */
@@ -210,23 +248,6 @@ public class TestGraphOperations extends JavaProgramTestBase {
 							"5,1,51\n" +
 							"6,1,61\n";
 				}
-				case 9: {
-				/*
-				 * Test getDegrees() with disconnected data
-				 */
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-					Graph<Long, NullValue, Long> graph = 
-							Graph.create(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
-
-					graph.outDegrees().writeAsCsv(resultPath);
-					env.execute();
-					return "1,2\n" +
-							"2,1\n" +
-							"3,0\n" +
-							"4,1\n" +
-							"5,0\n";
-					}
 				default:
 					throw new IllegalArgumentException("Invalid program id");
 			}


Mime
View raw message