flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/2] flink git commit: [FLINK-1576] [gelly] improvements to the gelly examples. Updated GraphMetrics, MusicProfiles and PageRank to run with and without parameters. Added input descriptions to LabelPropagation and SSSP. Fixed some minor issues in the SS
Date Tue, 17 Mar 2015 14:32:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1c50d87c1 -> 9077a53bf


[FLINK-1576] [gelly] improvements to the gelly examples.
Updated GraphMetrics, MusicProfiles and PageRank to run with and without parameters.
Added input descriptions to LabelPropagation and SSSP.
Fixed some minor issues in the SSSP example.
Fixed a bug in MusicProfiles that wasn't generating the user-user graph properly.
Changed the PageRank library method to initialize the vertex ranks.

This closes #470


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e795c437
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e795c437
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e795c437

Branch: refs/heads/master
Commit: e795c437e27205273018bfc9a43da38bb1701116
Parents: 1c50d87
Author: vasia <vasilikikalavri@gmail.com>
Authored: Mon Mar 9 23:47:31 2015 +0100
Committer: Vasia Kalavri <vasia@apache.org>
Committed: Tue Mar 17 16:02:23 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/example/GraphMetrics.java       |  80 +++++++++++---
 .../graph/example/LabelPropagationExample.java  |  20 ++--
 .../flink/graph/example/MusicProfiles.java      | 105 ++++++++++++++++---
 .../flink/graph/example/PageRankExample.java    |  77 +++++++++++---
 .../SingleSourceShortestPathsExample.java       |  78 +++++++-------
 .../utils/SingleSourceShortestPathsData.java    |  21 +---
 .../apache/flink/graph/library/PageRank.java    |  21 +++-
 .../example/LabelPropagationExampleITCase.java  |  64 +++++------
 .../SingleSourceShortestPathsITCase.java        |   8 +-
 9 files changed, 326 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 33f8f1a..a5ddf2a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.ExampleUtils;
 import org.apache.flink.types.NullValue;
@@ -42,25 +43,24 @@ import org.apache.flink.types.NullValue;
  * - average node degree
  * - the vertex ids with the max/min in- and out-degrees
  *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * "<sourceVertexID>\t<targetVertexID>".
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ *
  */
 public class GraphMetrics implements ProgramDescription {
 
-	static final int NUM_VERTICES = 100;
-	static final long SEED = 9876;
-	
-
-	@Override
-	public String getDescription() {
-		return "Graph Metrics Example";
-	}
-
 	public static void main(String[] args) throws Exception {
 
+		if (!parseParameters(args)) {
+			return;
+		}
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		/** create a random graph **/
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(ExampleUtils
-				.getRandomEdges(env, NUM_VERTICES), env);
+		/** create the graph **/
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env),
env);
 		
 		/** get the number of vertices **/
 		DataSet<Integer> numVertices = graph.numberOfVertices();
@@ -98,7 +98,7 @@ public class GraphMetrics implements ProgramDescription {
 
 		env.execute();
 	}
-	
+
 	@SuppressWarnings("serial")
 	private static final class AvgNodeDegreeMapper extends RichMapFunction<Tuple2<Long,
Long>, Double> {
 
@@ -120,4 +120,58 @@ public class GraphMetrics implements ProgramDescription {
 	private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>,
Long> {
 		public Long map(Tuple2<Long, Long> value) { return value.f0; }
 	}
+
+	@Override
+	public String getDescription() {
+		return "Graph Metrics Example";
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String edgesInputPath = null;
+
+	static final int NUM_VERTICES = 100;
+
+	static final long SEED = 9876;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 1) {
+				System.err.println("Usage: GraphMetrics <input edges>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgesInputPath = args[0];
+		} else {
+			System.out.println("Executing Graph Metrics example with default parameters and built-in
default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("Usage: GraphMetrics <input edges>");
+		}
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment
env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n").fieldDelimiter("\t")
+					.types(Long.class, Long.class).map(
+							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
+									return new Edge<Long, NullValue>(value.f0, value.f1, 
+											NullValue.getInstance());
+								}
+					});
+		} else {
+			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
index e399b3f..c43dbaa 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
@@ -38,6 +39,14 @@ import org.apache.flink.util.Collector;
  * the most frequent label among their neighbors. The algorithm converges when
  * no vertex changes value or the maximum number of iterations have been
  * reached.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs
+ * in the following format:"<sourceVertexID>\t<targetVertexID>".
+ *
+ * The vertices input file is expected to contain one vertex per line, with long IDs
+ * and long vertex values, in the following format:"<vertexID>\t<vertexValue>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
  */
 public class LabelPropagationExample implements ProgramDescription {
 
@@ -109,15 +118,10 @@ public class LabelPropagationExample implements ProgramDescription {
 
 		if (fileOutput) {
 			return env.readCsvFile(vertexInputPath)
-					.fieldDelimiter(" ")
+					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
 					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
-						@Override
-						public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception
{
-							return new Vertex<Long, Long>(value.f0, value.f1);
-						}
-					});
+					.map(new Tuple2ToVertexMap<Long, Long>());
 		}
 
 		return env.generateSequence(1, numVertices).map(
@@ -133,7 +137,7 @@ public class LabelPropagationExample implements ProgramDescription {
 
 		if (fileOutput) {
 			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter(" ")
+					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
 					.types(Long.class, Long.class)
 					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>()
{

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 948ac5b..9b18623 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -55,23 +55,33 @@ public class MusicProfiles implements ProgramDescription {
 	 * users that listen to the same song are connected. Finally, we use the
 	 * graph API to run the label propagation community detection algorithm on
 	 * the similarity graph.
+	 *
+	 * The triplets input is expected to be given as one triplet per line,
+	 * in the following format: "<userID>\t<songID>\t<playcount>".
+	 *
+	 * The mismatches input file is expected to contain one mismatch record per line,
+	 * in the following format:
+	 * "ERROR: <songID trackID> song_title"
+	 *
+	 * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}.
 	 */
 	public static void main(String[] args) throws Exception {
 
+		if (!parseParameters(args)) {
+			return;
+		}
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		final int numIterations = 10;
 
 		/**
-		 * Read the user-song-play triplets The format is
-		 * <userID>\t<songID>\t<playcount>
+		 * Read the user-song-play triplets.
 		 */
-		DataSet<Tuple3<String, String, Integer>> triplets = MusicProfilesData.getUserSongTriplets(env);
+		DataSet<Tuple3<String, String, Integer>> triplets = getUserSongTripletsData(env);
 
 		/**
-		 * Read the mismatches dataset and extract the songIDs The format is
-		 * "ERROR: <songID trackID> song_title"
+		 * Read the mismatches dataset and extract the songIDs
 		 */
-		DataSet<Tuple1<String>> mismatches = MusicProfilesData.getMismatches(env).map(new
ExtractMismatchSongIds());
+		DataSet<Tuple1<String>> mismatches = getMismatchesData(env).map(new ExtractMismatchSongIds());
 
 		/**
 		 * Filter out the mismatches from the triplets dataset
@@ -93,7 +103,11 @@ public class MusicProfiles implements ProgramDescription {
 				.reduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT)
 				.filter(new FilterSongNodes());
 
-		usersWithTopTrack.print();
+		if (fileOutput) {
+			usersWithTopTrack.writeAsCsv(topTracksOutputPath, "\n", "\t");
+		} else {
+			usersWithTopTrack.print();
+		}
 
 		/**
 		 * Create a user-user similarity graph, based on common songs, i.e. two
@@ -126,10 +140,14 @@ public class MusicProfiles implements ProgramDescription {
 							public Long map(Tuple2<Long, Long> value) {
 								return value.f1;
 							}
-						}).run(new LabelPropagation<String>(numIterations))
+						}).run(new LabelPropagation<String>(maxIterations))
 				.getVertices();
 
-		verticesWithCommunity.print();
+		if (fileOutput) {
+			verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t");
+		} else {
+			verticesWithCommunity.print();
+		}
 
 		env.execute();
 	}
@@ -191,8 +209,10 @@ public class MusicProfiles implements ProgramDescription {
 				listeners.add(edge.getSource());
 			}
 			for (int i = 0; i < listeners.size() - 1; i++) {
-				out.collect(new Edge<String, NullValue>(listeners.get(i),
-						listeners.get(i + 1), NullValue.getInstance()));
+				for (int j = i + 1; j < listeners.size(); j++) {
+					out.collect(new Edge<String, NullValue>(listeners.get(i),
+							listeners.get(j), NullValue.getInstance()));
+				}
 			}
 		}
 	}
@@ -213,4 +233,65 @@ public class MusicProfiles implements ProgramDescription {
 	public String getDescription() {
 		return "Music Profiles Example";
 	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String userSongTripletsInputPath = null;
+
+	private static String mismatchesInputPath = null;
+
+	private static String topTracksOutputPath = null;
+
+	private static String communitiesOutputPath = null;
+
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 5) {
+				System.err.println("Usage: MusicProfiles <input user song triplets path>" +
+						" <input song mismatches path> <output top tracks path> "
+						+ "<output communities path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			userSongTripletsInputPath = args[0];
+			mismatchesInputPath = args[1];
+			topTracksOutputPath = args[2];
+			communitiesOutputPath = args[3];
+			maxIterations = Integer.parseInt(args[4]);
+		} else {
+			System.out.println("Executing Music Profiles example with default parameters and built-in
default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("Usage: MusicProfiles <input user song triplets path>" +
+					" <input song mismatches path> <output top tracks path> "
+					+ "<output communities path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Tuple3<String, String, Integer>> getUserSongTripletsData(ExecutionEnvironment
env) {
+		if (fileOutput) {
+			return env.readCsvFile(userSongTripletsInputPath)
+					.lineDelimiter("\n").fieldDelimiter("\t")
+					.types(String.class, String.class, Integer.class);
+		} else {
+			return MusicProfilesData.getUserSongTriplets(env);
+		}
+	}
+
+	private static DataSet<String> getMismatchesData(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readTextFile(mismatchesInputPath);
+		} else {
+			return MusicProfilesData.getMismatches(env);
+		}
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
index d279aa5..cc0b702 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
@@ -28,20 +28,38 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.library.PageRank;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.util.Collector;
 
+/**
+ * This example implements a simple PageRank algorithm, using a vertex-centric iteration.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs and double
+ * values, in the following format:"<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 10 vertices
+ * and random edge weights.
+ *
+ */
 public class PageRankExample implements ProgramDescription {
 
 	@SuppressWarnings("serial")
 	public static void main(String[] args) throws Exception {
 
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if(!parseParameters(args)) {
+			return;
+		}
 
-		DataSet<Vertex<Long, Double>> pages = getPagesDataSet(env);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
 
-		Graph<Long, Double, Double> network = Graph.fromDataSet(pages, links, env);
+		Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long,
Double>() {
+
+			public Double map(Long value) throws Exception {
+				return 1.0;
+			}
+		}, env);
 
 		DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
 
@@ -58,34 +76,63 @@ public class PageRankExample implements ProgramDescription {
 				new PageRank<Long>(DAMPENING_FACTOR, maxIterations))
 				.getVertices();
 
-		pageRanks.print();
+		if (fileOutput) {
+			pageRanks.writeAsCsv(outputPath, "\n", "\t");
+		} else {
+			pageRanks.print();
+		}
 
 		env.execute();
 	}
 
 	@Override
 	public String getDescription() {
-		return "PageRank";
+		return "PageRank example";
 	}
 
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
 	private static final double DAMPENING_FACTOR = 0.85;
 	private static long numPages = 10;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
 	private static int maxIterations = 10;
 
-	@SuppressWarnings("serial")
-	private static DataSet<Vertex<Long, Double>> getPagesDataSet(ExecutionEnvironment
env) {
-		return env.generateSequence(1, numPages).map(
-				new MapFunction<Long, Vertex<Long, Double>>() {
-					@Override
-					public Vertex<Long, Double> map(Long l) throws Exception {
-						return new Vertex<Long, Double>(l, 1.0 / numPages);
-					}
-				});
-
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: PageRank <input edges path> <output path> <num
iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+			System.out.println("Executing PageRank example with default parameters and built-in default
data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: PageRank <input edges path> <output path> <num
iterations>");
+		}
+		return true;
 	}
 
 	@SuppressWarnings("serial")
 	private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment
env) {
+
+		if (fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		}
+
 		return env.generateSequence(1, numPages).flatMap(
 				new FlatMapFunction<Long, Edge<Long, Double>>() {
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index 6c85397..ff523ce 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -30,8 +29,20 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.graph.library.SingleSourceShortestPaths;
 
+/**
+ * This example implements the Single Source Shortest Paths algorithm,
+ * using a vertex-centric iteration.
+ *
+ * The input file is expected to contain one edge per line, with long IDs
+ * and double weights, in CSV format:
+ * "<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
+ *
+ * If no arguments are provided, the example runs with default data from {@link SingleSourceShortestPathsData}.
+ *
+ */
 public class SingleSourceShortestPathsExample implements ProgramDescription {
 
+	@SuppressWarnings("serial")
 	public static void main(String[] args) throws Exception {
 
 		if (!parseParameters(args)) {
@@ -40,15 +51,19 @@ public class SingleSourceShortestPathsExample implements ProgramDescription
{
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Vertex<Long, Double>> vertices = getVerticesDataSet(env);
-
 		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
 
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges,
+				new MapFunction<Long, Double>() {
+
+					public Double map(Long value) {
+						return Double.MAX_VALUE;
+					}
+		}, env);
 
 		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
-				.run(new SingleSourceShortestPaths<Long>(srcVertexId,
-						maxIterations)).getVertices();
+				.run(new SingleSourceShortestPaths<Long>(srcVertexId, maxIterations))
+				.getVertices();
 
 		// emit result
 		if (fileOutput) {
@@ -71,9 +86,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription
{
 
 	private static boolean fileOutput = false;
 
-	private static Long srcVertexId = null;
-
-	private static String verticesInputPath = null;
+	private static Long srcVertexId = 1l;
 
 	private static String edgesInputPath = null;
 
@@ -83,41 +96,27 @@ public class SingleSourceShortestPathsExample implements ProgramDescription
{
 
 	private static boolean parseParameters(String[] args) {
 
-		if (args.length > 0) {
-			if (args.length == 5) {
-				fileOutput = true;
-				srcVertexId = Long.parseLong(args[0]);
-				verticesInputPath = args[1];
-				edgesInputPath = args[2];
-				outputPath = args[3];
-				maxIterations = Integer.parseInt(args[4]);
-			} else {
+		if(args.length > 0) {
+			if(args.length != 4) {
 				System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-						" <input vertices path> <input edges path> <output path> <num
iterations>");
+						" <input edges path> <output path> <num iterations>");
 				return false;
 			}
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment
env) {
-		if (fileOutput) {
-			return env.readCsvFile(verticesInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Double.class)
-					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>()
{
 
-						@Override
-						public Vertex<Long, Double> map(Tuple2<Long, Double> tuple2) throws Exception
{
-							return new Vertex<Long, Double>(tuple2.f0, tuple2.f1);
-						}
-					});
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
 		} else {
-			System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-					" <input vertices path> <input edges path> <output path> <num iterations>");
-			return SingleSourceShortestPathsData.getDefaultVertexDataSet(env);
+				System.out.println("Executing Single Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
 		}
+		return true;
 	}
 
 	@SuppressWarnings("serial")
@@ -125,6 +124,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription
{
 		if (fileOutput) {
 			return env.readCsvFile(edgesInputPath)
 					.lineDelimiter("\n")
+					.fieldDelimiter("\t")
 					.types(Long.class, Long.class, Double.class)
 					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>()
{
 
@@ -134,8 +134,6 @@ public class SingleSourceShortestPathsExample implements ProgramDescription
{
 						}
 					});
 		} else {
-			System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-					" <input vertices path> <input edges path> <output path> <num iterations>");
 			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
index 7e5445f..67cd150 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
@@ -21,33 +21,16 @@ package org.apache.flink.graph.example.utils;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class SingleSourceShortestPathsData {
 
-	public static final int NUM_VERTICES = 5;
-
 	public static final Long SRC_VERTEX_ID = 1L;
 
-	public static final String VERTICES = "1,1.0\n" + "2,2.0\n" + "3,3.0\n" + "4,4.0\n" + "5,5.0";
-
-	public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment
env) {
-
-		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
-		vertices.add(new Vertex<Long, Double>(1L, 1.0));
-		vertices.add(new Vertex<Long, Double>(2L, 2.0));
-		vertices.add(new Vertex<Long, Double>(3L, 3.0));
-		vertices.add(new Vertex<Long, Double>(4L, 4.0));
-		vertices.add(new Vertex<Long, Double>(5L, 5.0));
-
-		return env.fromCollection(vertices);
-	}
-
-	public static final String EDGES = "1,2,12.0\n" + "1,3,13.0\n" + "2,3,23.0\n" + "3,4,34.0\n"
+ "3,5,35.0\n" + 
-					"4,5,45.0\n" + "5,1,51.0";
+	public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n"
+ "3\t5\t35.0\n" +
+					"4\t5\t45.0\n" + "5\t1\t51.0";
 
 	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment
env) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index e43ee51..e06e64f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library;
 
 import java.io.Serializable;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -41,9 +42,15 @@ public class PageRank<K extends Comparable<K> & Serializable>
implements
 
 	@Override
 	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
+
+		DataSet<Integer> numberOfVertices = network.numberOfVertices();
+
 		VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
 				new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
-		iteration.addBroadcastSetForUpdateFunction("numberOfVertices", network.numberOfVertices());
+
+		iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
+		iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
+
 		return network.runVertexCentricIteration(iteration);
 	}
 
@@ -55,7 +62,6 @@ public class PageRank<K extends Comparable<K> & Serializable>
implements
 	public static final class VertexRankUpdater<K extends Comparable<K> & Serializable>
 			extends VertexUpdateFunction<K, Double, Double> {
 
-		
 		private final double beta;
 		private int numVertices;
 		
@@ -91,8 +97,19 @@ public class PageRank<K extends Comparable<K> & Serializable>
implements
 	public static final class RankMessenger<K extends Comparable<K> & Serializable>
 			extends MessagingFunction<K, Double, Double, Double> {
 
+		private int numVertices;
+
+		@Override
+		public void preSuperstep(){
+			numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+		}
+
 		@Override
 		public void sendMessages(K vertexId, Double newRank) {
+			if (getSuperstepNumber() == 1) {
+				// initialize vertex ranks
+				newRank = 1.0 / numVertices;
+			}
 			for (Edge<K, Double> edge : getOutgoingEdges()) {
 				sendMessageTo(edge.getTarget(), newRank * edge.getValue());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
index 0e1810f..3298b7f 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
@@ -61,20 +61,20 @@ public class LabelPropagationExampleITCase extends MultipleProgramsTestBase
{
 		 * Test one iteration of label propagation example with a simple graph
 		 */
 
-		final String vertices = "1 10\n" +
-				"2 10\n" +
-				"3 30\n" +
-				"4 40\n" +
-				"5 40\n" +
-				"6 40\n" +
-				"7 70\n";
-
-		final String edges = "1 3\n" +
-				"2 3\n" +
-				"4 7\n" +
-				"5 7\n" +
-				"6 7\n" +
-				"7 3\n";
+		final String vertices = "1	10\n" +
+				"2	10\n" +
+				"3	30\n" +
+				"4	40\n" +
+				"5	40\n" +
+				"6	40\n" +
+				"7	70\n";
+
+		final String edges = "1	3\n" +
+				"2	3\n" +
+				"4	7\n" +
+				"5	7\n" +
+				"6	7\n" +
+				"7	3\n";
 
 		String verticesPath = createTempFile(vertices);
 		String edgesPath = createTempFile(edges);
@@ -96,24 +96,24 @@ public class LabelPropagationExampleITCase extends MultipleProgramsTestBase
{
 		 * Test the label propagation example where a tie must be broken
 		 */
 
-		final String vertices = "1 10\n" +
-				"2 10\n" +
-				"3 10\n" +
-				"4 10\n" +
-				"5 0\n" +
-				"6 20\n" +
-				"7 20\n" +
-				"8 20\n" +
-				"9 20\n";
-
-		final String edges = "1 5\n" +
-				"2 5\n" +
-				"3 5\n" +
-				"4 5\n" +
-				"6 5\n" +
-				"7 5\n" +
-				"8 5\n" +
-				"9 5\n";
+		final String vertices = "1	10\n" +
+				"2	10\n" +
+				"3	10\n" +
+				"4	10\n" +
+				"5	0\n" +
+				"6	20\n" +
+				"7	20\n" +
+				"8	20\n" +
+				"9	20\n";
+
+		final String edges = "1	5\n" +
+				"2	5\n" +
+				"3	5\n" +
+				"4	5\n" +
+				"6	5\n" +
+				"7	5\n" +
+				"8	5\n" +
+				"9	5\n";
 
 		String verticesPath = createTempFile(vertices);
 		String edgesPath = createTempFile(edges);

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
index 9f563da..aa2d6f0 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
@@ -36,8 +36,6 @@ import java.io.File;
 @RunWith(Parameterized.class)
 public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
 
-    private String verticesPath;
-
     private String edgesPath;
 
     private String resultPath;
@@ -54,20 +52,16 @@ public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase
{
     @Before
     public void before() throws Exception {
         resultPath = tempFolder.newFile().toURI().toString();
-        File verticesFile = tempFolder.newFile();
-        Files.write(SingleSourceShortestPathsData.VERTICES, verticesFile, Charsets.UTF_8);
 
         File edgesFile = tempFolder.newFile();
         Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, Charsets.UTF_8);
-
-        verticesPath = verticesFile.toURI().toString();
         edgesPath = edgesFile.toURI().toString();
     }
 
     @Test
     public void testSSSPExample() throws Exception {
         SingleSourceShortestPathsExample.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID
+ "",
-                verticesPath, edgesPath, resultPath, SingleSourceShortestPathsData.NUM_VERTICES
+ ""});
+                edgesPath, resultPath, 10 + ""});
         expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
     }
 


Mime
View raw message