flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1514) [Gelly] Add a Gather-Sum-Apply iteration method
Date Tue, 24 Feb 2015 14:29:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334920#comment-14334920
] 

ASF GitHub Bot commented on FLINK-1514:
---------------------------------------

Github user balidani commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25255193
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply
iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long,
Double, Double> {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
    +				.run(new GSASingleSourceShortestPathsExample()).getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  GraphAlgorithm Functions
    +	// --------------------------------------------------------------------------------------------
    +
    +	@Override
    +	public Graph<Long, Double, Double> run(Graph<Long, Double, Double> input)
{
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Long, Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Long, Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = input.createGatherSumApplyIteration(
    +				gather, sum, apply, this.maxIterations);
    +		return input.mapVertices(new InitVerticesMapper<Long>(this.srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> &
Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +				Edge<Long, Double>, Vertex<Long, Double>> triplet) {
    --- End diff --
    
    Agreed! Where should the `Triplet` class be implemented?


> [Gelly] Add a Gather-Sum-Apply iteration method
> -----------------------------------------------
>
>                 Key: FLINK-1514
>                 URL: https://issues.apache.org/jira/browse/FLINK-1514
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>    Affects Versions: 0.9
>            Reporter: Vasia Kalavri
>            Assignee: Daniel Bali
>
> This will be a method that implements the GAS computation model, but without the "scatter"
step. The phases can be mapped into the following steps inside a delta iteration:
> gather: a map on each < srcVertex, edge, trgVertex > that produces a partial value
> sum: a reduce that combines the partial values
> apply: join with vertex set to update the vertex values using the results of sum and
the previous state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message