flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [4/7] flink git commit: [FLINK-3207] [gelly] add a pregel SSSP example with combiner
Date Mon, 21 Mar 2016 18:11:14 GMT
[FLINK-3207] [gelly] add a pregel SSSP example with combiner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77eb4f0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77eb4f0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77eb4f0c

Branch: refs/heads/master
Commit: 77eb4f0c41fe45d3ad4efc8c54f21394cd973cb5
Parents: c5ffb5d
Author: vasia <vasia@apache.org>
Authored: Tue Feb 2 12:41:51 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Mon Mar 21 19:10:29 2016 +0100

----------------------------------------------------------------------
 .../examples/GSASingleSourceShortestPaths.java  |   3 +-
 .../SingleSourceShortestPathsITCase.java        |   8 +
 .../main/java/org/apache/flink/graph/Graph.java |  45 +++++
 .../apache/flink/graph/example/PregelSSSP.java  | 194 +++++++++++++++++++
 4 files changed, 249 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
index 1732016..35f07b0 100755
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
@@ -36,7 +36,8 @@ import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
  * This example shows how to use Gelly's Gather-Sum-Apply iterations.
  * 
  * It is an implementation of the Single-Source-Shortest-Paths algorithm.
- * For a vertex-centric implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}.

+ * For a scatter-gather implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}
+ * and for a vertex-centric implementation, see {@link PregelSSSP}. 
  *
  * The input file is a plain text file and must be formatted as follows:
  * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are

http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
index faf92c0..258ed16 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
@@ -24,6 +24,7 @@ import com.google.common.io.Files;
 import org.apache.flink.graph.examples.GSASingleSourceShortestPaths;
 import org.apache.flink.graph.examples.SingleSourceShortestPaths;
 import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.graph.example.PregelSSSP;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.junit.After;
@@ -75,6 +76,13 @@ public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase
{
         expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
     }
 
+    @Test
+    public void testPregelSSSPExample() throws Exception {
+        PregelSSSP.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
     @After
     public void after() throws Exception {
         TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index ce8e895..f2b5b22 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -51,6 +51,10 @@ import org.apache.flink.graph.gsa.GSAConfiguration;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.pregel.ComputeFunction;
+import org.apache.flink.graph.pregel.MessageCombiner;
+import org.apache.flink.graph.pregel.VertexCentricConfiguration;
+import org.apache.flink.graph.pregel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.ScatterGatherIteration;
@@ -1680,6 +1684,47 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
+	 * Runs a VertexCentric iteration on the graph.
+	 * No configuration options are provided.
+	 *
+	 * @param computeFunction the vertex update function
+	 * @param combiner an optional message combiner
+	 * @param maximumNumberOfIterations maximum number of iterations to perform
+	 * 
+	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * after maximumNumberOfIterations.
+	 */
+	public <M> Graph<K, VV, EV> runVertexCentricIteration(
+			ComputeFunction<K, VV, EV, M> computeFunction, 
+			MessageCombiner<K, M> combiner, int maximumNumberOfIterations) {
+
+		return this.runVertexCentricIteration(computeFunction, combiner, maximumNumberOfIterations,
null);
+	}
+
+	/**
+	 * Runs a VetexCentric iteration on the graph with configuration options.
+	 * 
+	 * @param computeFunction the vertex update function
+	 * @param combiner an optional message combiner
+	 * @param maximumNumberOfIterations maximum number of iterations to perform
+	 * @param parameters the iteration configuration parameters
+	 * 
+	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * after maximumNumberOfIterations.
+	 */
+	public <M> Graph<K, VV, EV> runVertexCentricIteration(
+			ComputeFunction<K, VV, EV, M> computeFunction,
+			MessageCombiner<K, M> combiner, int maximumNumberOfIterations,
+			VertexCentricConfiguration parameters) {
+
+		VertexCentricIteration<K, VV, EV, M> iteration = VertexCentricIteration.withEdges(
+				edges, computeFunction, maximumNumberOfIterations);
+		iteration.configure(parameters);
+		DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
+		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+	}
+
+	/**
 	 * @param algorithm the algorithm to run on the Graph
 	 * @param <T> the return type
 	 * @return the result of the graph algorithm

http://git-wip-us.apache.org/repos/asf/flink/blob/77eb4f0c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
new file mode 100644
index 0000000..93cc360
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/PregelSSSP.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.pregel.ComputeFunction;
+import org.apache.flink.graph.pregel.MessageCombiner;
+import org.apache.flink.graph.pregel.MessageIterator;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use Gelly's Vertex-Centric iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a scatter-gather implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}
+ * and for a gather-sum-apply implementation see {@link GSASingleSourceShortestPaths}.  
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
+ */
+public class PregelSSSP implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(),
env);
+
+		// Execute the vertex-centric iteration
+		Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
+				new SSSPComputeFunction(srcVertexId), new SSSPCombiner(), 
+				maxIterations);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
+			env.execute("Pregel Single Source Shortest Paths Example");
+		} else {
+			singleSourceShortestPaths.print();
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices implements MapFunction<Long, Double> {
+
+		public Double map(Long id) { return Double.POSITIVE_INFINITY; }
+	}
+
+	/**
+	 * The compute function for SSSP
+	 */
+	@SuppressWarnings("serial")
+	public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double,
Double> {
+
+		private final long srcId;
+
+		public SSSPComputeFunction(long src) {
+			this.srcId = src;
+		}
+
+		public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages)
{
+
+			double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY;
+
+			for (Double msg : messages) {
+				minDistance = Math.min(minDistance, msg);
+			}
+
+			if (minDistance < vertex.getValue()) {
+				setNewVertexValue(minDistance);
+				for (Edge<Long, Double> e: getEdges()) {
+					sendMessageTo(e.getTarget(), minDistance + e.getValue());
+				}
+			}
+		}
+	}
+
+	/**
+	 * The messages combiner.
+	 * Out of all messages destined to a target vertex, only the minimum distance is propagated.
+	 */
+	@SuppressWarnings("serial")
+	public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
+
+		public void combineMessages(MessageIterator<Double> messages) {
+
+			double minMessage = Double.POSITIVE_INFINITY;
+			for (Double msg: messages) {
+				minMessage = Math.min(minMessage, msg);
+			}
+			sendCombinedMessage(minMessage);
+		}
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static Long srcVertexId = 1l;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static int maxIterations = 5;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: PregelSSSP <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+				System.out.println("Executing Pregel Single Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: PregelSSSP <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment
env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n")
+					.fieldDelimiter("\t")
+					.ignoreComments("%")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		} else {
+			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "Vertex-centric Single Source Shortest Paths";
+	}
+}
\ No newline at end of file


Mime
View raw message