Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B3A4617C8E for ; Sat, 18 Apr 2015 11:49:07 +0000 (UTC) Received: (qmail 54784 invoked by uid 500); 18 Apr 2015 11:49:07 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 54738 invoked by uid 500); 18 Apr 2015 11:49:07 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 54729 invoked by uid 99); 18 Apr 2015 11:49:07 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 18 Apr 2015 11:49:07 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: message received from 54.164.171.186 which is an MX secondary for issues@flink.apache.org) Received: from [54.164.171.186] (HELO mx1-us-east.apache.org) (54.164.171.186) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 18 Apr 2015 11:49:03 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 4350B453E7 for ; Sat, 18 Apr 2015 11:48:42 +0000 (UTC) Received: (qmail 53817 invoked by uid 99); 18 Apr 2015 11:48:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 18 Apr 2015 11:48:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A41EFE022F; Sat, 18 Apr 2015 11:48:41 +0000 (UTC) From: vasia To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e... Content-Type: text/plain Message-Id: <20150418114841.A41EFE022F@git1-us-west.apache.org> Date: Sat, 18 Apr 2015 11:48:41 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643459 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.graph.spargel.IterationConfiguration; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * Incremental Single Sink Shortest Paths Example. + * + * The program takes as input the resulted graph after a SSSP computation, + * an edge to be removed and the initial graph(i.e. before SSSP was computed). + * + * - If the removed edge does not belong to the SP-graph, no computation is necessary. + * The edge is simply removed from the graph. + * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge, + * potentially require re-computation. + * When the edge is removed, v checks if it has another out-going SP-edge. + * If yes, no further computation is required. + * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF. + * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message. + * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge. + * If not, it invalidates its current value and propagates the INVALIDATE message. + * The propagation stops when a vertex with an alternative shortest path is reached + * or when we reach a vertex with no SP-in-neighbors. + * + * Usage IncrementalSSSPExample <vertex path> <edge path> <edges in SSSP> + * <edge to be removed> <result path> <number of iterations>
+ * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData} + */ +@SuppressWarnings("serial") +public class IncrementalSSSPExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> vertices = getVerticesDataSet(env); + + DataSet> edges = getEdgesDataSet(env); + + DataSet> edgesInSSSP = getEdgesinSSSPDataSet(env); + + Edge edgeToBeRemoved = getEdgeToBeRemoved(); + + Graph graph = Graph.fromDataSet(vertices, edges, env); + + // Assumption: all minimum weight paths are kept + Graph ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env); + + // remove the edge + graph.removeEdge(edgeToBeRemoved); + + // configure the iteration + IterationConfiguration parameters = new IterationConfiguration(); + + if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) { + + parameters.setDirection(EdgeDirection.IN); + parameters.setOptDegrees(true); + + // run the vertex centric iteration to propagate info + Graph result = ssspGraph.runVertexCentricIteration(new VertexDistanceUpdater(), + new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters); + + DataSet> resultedVertices = result.getVertices(); + + // Emit results + if(fileOutput) { + resultedVertices.writeAsCsv(outputPath, "\n", ","); + } else { + resultedVertices.print(); + } + + env.execute("Incremental SSSP Example"); + } else { + // print the vertices + if(fileOutput) { + vertices.writeAsCsv(outputPath, "\n", ","); + } else { + vertices.print(); + } + + env.execute("Incremental SSSP Example"); + } + } + + @Override + public String getDescription() { + return "Incremental Single Sink Shortest Paths Example"; + } + + // ****************************************************************************************************************** + // IncrementalSSSP METHODS + // ****************************************************************************************************************** + + /** + * Function that verifies whether the edge to be removed is part of the SSSP or not. + * If it is, the src vertex will be invalidated. + * + * @param edgeToBeRemoved + * @param edgesInSSSP + * @return + */ + private static boolean isInSSSP(final Edge edgeToBeRemoved, DataSet> edgesInSSSP) throws Exception { + + return edgesInSSSP.filter(new FilterFunction>() { + @Override + public boolean filter(Edge edge) throws Exception { + return edge.equals(edgeToBeRemoved); + } + }).count() > 0; + } + + public static final class VertexDistanceUpdater extends VertexUpdateFunction { + + @Override + public void updateVertex(Vertex vertex, MessageIterator inMessages) throws Exception { + if (inMessages.hasNext()) { + Long outDegree = vertex.getOutDegree() - 1; + // check if the vertex has another SP-Edge + if (outDegree > 0) { + // there is another shortest path from the source to this vertex + } else { + // set own value to infinity + setNewVertexValue(Double.MAX_VALUE); + } + } + } + } + + public static final class InvalidateMessenger extends MessagingFunction { + + private Edge edgeToBeRemoved; + + public InvalidateMessenger(Edge edgeToBeRemoved) { + this.edgeToBeRemoved = edgeToBeRemoved; + } + + @Override + public void sendMessages(Vertex vertex) throws Exception { + + + if(getSuperstepNumber() == 1) { + if(vertex.getId().equals(edgeToBeRemoved.getSource())) { + // activate the edge target + sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE); + } + } + + if(getSuperstepNumber() > 1) { + // invalidate all edges + for(Edge edge : getEdges()) { + sendMessageTo(edge.getSource(), Double.MAX_VALUE); + } + } + } + } + + // ****************************************************************************************************************** + // UTIL METHODS + // ****************************************************************************************************************** + + private static boolean fileOutput = false; + + private static String verticesInputPath = null; + + private static String edgesInputPath = null; + + private static String edgesInSSSPInputPath = null; + + private static String edgeToBeRemoved = null; + + private static String outputPath = null; + + private static int maxIterations = 5; + + private static boolean parseParameters(String[] args) { + if (args.length > 0) { + if (args.length == 6) { + fileOutput = true; + verticesInputPath = args[0]; + edgesInputPath = args[1]; + edgesInSSSPInputPath = args[2]; + edgeToBeRemoved = args[3]; + outputPath = args[4]; + maxIterations = Integer.parseInt(args[5]); + } else { + System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("See the documentation for the correct format of input files."); + System.out.println("Usage: IncrementalSSSP "); + + return false; + } + } + return true; + } + + private static DataSet> getVerticesDataSet(ExecutionEnvironment env) { + if (fileOutput) { + return env.readCsvFile(verticesInputPath) + .lineDelimiter("\n") + .types(Long.class, Double.class) + .map(new MapFunction, Vertex>() { --- End diff -- we have a Tuple2ToVertex utility mapper :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---