flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/2] flink git commit: [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
Date Tue, 17 Mar 2015 16:30:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9077a53bf -> a1daadcba


[FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9db170fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9db170fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9db170fc

Branch: refs/heads/master
Commit: 9db170fc31af7a7ddfe6e77d55006efbefd91800
Parents: 9077a53
Author: andralungu <lungu.andra@gmail.com>
Authored: Sat Mar 7 17:17:28 2015 +0100
Committer: Vasia Kalavri <vasia@apache.org>
Committed: Tue Mar 17 18:04:48 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 28 ++++------
 .../flink/graph/example/GraphMetrics.java       | 10 ++--
 .../apache/flink/graph/library/PageRank.java    | 13 +++++
 .../apache/flink/graph/utils/GraphUtils.java    | 57 --------------------
 .../flink/graph/validation/GraphValidator.java  |  3 +-
 .../validation/InvalidVertexIdsValidator.java   |  8 ++-
 .../flink/graph/test/WeaklyConnectedITCase.java |  8 +--
 .../test/operations/GraphCreationITCase.java    |  8 +--
 .../test/operations/GraphOperationsITCase.java  |  4 +-
 9 files changed, 41 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index f7c21d0..91620cc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -50,7 +50,6 @@ import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -293,7 +292,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * 
 	 * @return true if the Graph is valid.
 	 */
-	public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) {
+	public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
 		return validator.validate(this);
 	}
 
@@ -877,17 +876,17 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	}
 
 	/**
-	 * @return Singleton DataSet containing the vertex count
+	 * @return a long integer representing the number of vertices
 	 */
-	public DataSet<Integer> numberOfVertices() {
-		return GraphUtils.count(vertices, context);
+	public long numberOfVertices() throws Exception {
+		return vertices.count();
 	}
 
 	/**
-	 * @return Singleton DataSet containing the edge count
+	 * @return a long integer representing the number of edges
 	 */
-	public DataSet<Integer> numberOfEdges() {
-		return GraphUtils.count(edges, context);
+	public long numberOfEdges() throws Exception {
+		return edges.count();
 	}
 
 	/**
@@ -927,7 +926,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 *            the maximum number of iterations for the inner delta iteration
 	 * @return true if the graph is weakly connected.
 	 */
-	public DataSet<Boolean> isWeaklyConnected(int maxIterations) {
+	public Boolean isWeaklyConnected(int maxIterations) throws Exception {
 		// first, convert to an undirected graph
 		Graph<K, VV, EV> graph = this.getUndirected();
 
@@ -948,9 +947,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 				.with(new VertexWithNewComponentJoin<K>());
 
 		DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes);
-		DataSet<Boolean> result = GraphUtils.count(components.groupBy(1).reduceGroup(new
EmitFirstReducer<K>()),
-				context).map(new CheckIfOneComponentMapper());
-		return result;
+		return components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()).count() == 1;
 	}
 
 	private static final class DuplicateVertexIDMapper<K> implements MapFunction<K,
Tuple2<K, K>> {
@@ -983,13 +980,6 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 		}
 	}
 
-	private static final class CheckIfOneComponentMapper implements	MapFunction<Integer,
Boolean> {
-		@Override
-		public Boolean map(Integer n) {
-			return (n == 1);
-		}
-	}
-
 	/**
 	 * Adds the input vertex and edges to the graph. If the vertex already
 	 * exists in the graph, it will not be added again, but the given edges

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index a5ddf2a..1977255 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -63,17 +63,17 @@ public class GraphMetrics implements ProgramDescription {
 		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env),
env);
 		
 		/** get the number of vertices **/
-		DataSet<Integer> numVertices = graph.numberOfVertices();
+		long numVertices = graph.numberOfVertices();
 		
 		/** get the number of edges **/
-		DataSet<Integer> numEdges = graph.numberOfEdges();
+		long numEdges = graph.numberOfEdges();
 		
 		/** compute the average node degree **/
 		DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
 
 		DataSet<Double> avgNodeDegree = verticesWithDegrees
 				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper())
-				.withBroadcastSet(numVertices, "numberOfVertices");
+				.withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");
 		
 		/** find the vertex with the maximum in-degree **/
 		DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
@@ -88,8 +88,8 @@ public class GraphMetrics implements ProgramDescription {
 		DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId());
 		
 		/** print the results **/
-		ExampleUtils.printResult(numVertices, "Total number of vertices");
-		ExampleUtils.printResult(numEdges, "Total number of edges");
+		ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices");
+		ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges");
 		ExampleUtils.printResult(avgNodeDegree, "Average node degree");
 		ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree");
 		ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree");

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index e06e64f..3e6610e 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,7 +20,11 @@ package org.apache.flink.graph.library;
 
 import java.io.Serializable;
 
+<<<<<<< HEAD
 import org.apache.flink.api.java.DataSet;
+=======
+import org.apache.flink.api.java.ExecutionEnvironment;
+>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods
use DS.count()
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -47,10 +51,19 @@ public class PageRank<K extends Comparable<K> & Serializable>
implements
 
 		VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
 				new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
+<<<<<<< HEAD
 
 		iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
 		iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
 
+=======
+		try {
+			iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
+					ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods
use DS.count()
 		return network.runVertexCentricIteration(iteration);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
deleted file mode 100644
index aba1c14..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple;
-
-@SuppressWarnings("serial")
-public class GraphUtils {
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public static DataSet<Integer> count(DataSet set, ExecutionEnvironment env) {
-		List<Integer> list = new ArrayList<Integer>();
-		list.add(0);
-		DataSet<Integer> initialCount = env.fromCollection(list);
-		return set.map(new OneMapper()).union(initialCount)
-				.reduce(new AddOnesReducer()).first(1);
-	}
-
-	private static final class OneMapper<T extends Tuple> implements
-			MapFunction<T, Integer> {
-		@Override
-		public Integer map(T o) throws Exception {
-			return 1;
-		}
-	}
-
-	private static final class AddOnesReducer implements
-			ReduceFunction<Integer> {
-		@Override
-		public Integer reduce(Integer one, Integer two) throws Exception {
-			return one + two;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 04883be..339e38d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -20,7 +20,6 @@ package org.apache.flink.graph.validation;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 
 /**
@@ -34,6 +33,6 @@ import org.apache.flink.graph.Graph;
 public abstract class GraphValidator<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable>
 		implements Serializable {
 
-	public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph);
+	public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index b043f3c..aeca482 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
@@ -39,18 +38,17 @@ public class InvalidVertexIdsValidator<K extends Comparable<K>
& Serializable, V
 	 * Checks that the edge set input contains valid vertex Ids, i.e. that they
 	 * also exist in the vertex input set.
 	 * 
-	 * @return a singleton DataSet<Boolean> stating whether a graph is valid
+	 * @return a Boolean stating whether a graph is valid
 	 *         with respect to its vertex ids.
 	 */
 	@Override
-	public DataSet<Boolean> validate(Graph<K, VV, EV> graph) {
+	public Boolean validate(Graph<K, VV, EV> graph) throws Exception {
 		DataSet<Tuple1<K>> edgeIds = graph.getEdges()
 				.flatMap(new MapEdgeIds<K, EV>()).distinct();
 		DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
 				.equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
 
-		return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()),
-				graph.getContext()).map(new InvalidIdsMap());
+		return invalidIds.map(new KToTupleMap<K>()).count() == 0;
 	}
 
 	private static final class MapEdgeIds<K extends Comparable<K> & Serializable,
EV extends Serializable>

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
index 1628952..9db449e 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
@@ -62,7 +62,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 		
-		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
 		
 		env.execute();
 		expectedResult = "true\n";
@@ -78,7 +78,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
 		
-		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
 		
 		env.execute();
 		expectedResult = "false\n";
@@ -94,7 +94,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env).getUndirected();
 		
-		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
 		
 		env.execute();
 		expectedResult = "true\n";
@@ -110,7 +110,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
 		
-		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
 		
 		env.execute();
 		expectedResult = "false\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
index dc0e5d2..3fe69fd 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -123,9 +123,9 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
-		DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long,
Long>());
+		Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
 
-		result.writeAsText(resultPath);
+		env.fromElements(result).writeAsText(resultPath);
 		env.execute();
 
 		expectedResult = "true\n";
@@ -141,8 +141,8 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
-		DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long,
Long>());
-		result.writeAsText(resultPath);
+		Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+		env.fromElements(result).writeAsText(resultPath);
 		env.execute();
 
 		expectedResult = "false\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index e7f067a..6210f43 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -180,7 +180,7 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.numberOfVertices().writeAsText(resultPath);
+		env.fromElements(graph.numberOfVertices()).writeAsText(resultPath);
 
 		env.execute();
 		expectedResult = "5";
@@ -195,7 +195,7 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.numberOfEdges().writeAsText(resultPath);
+		env.fromElements(graph.numberOfEdges()).writeAsText(resultPath);
 
 		env.execute();
 		expectedResult = "7";


Mime
View raw message