Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 736B7181CC for ; Wed, 7 Oct 2015 16:14:27 +0000 (UTC) Received: (qmail 3275 invoked by uid 500); 7 Oct 2015 16:14:27 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 3228 invoked by uid 500); 7 Oct 2015 16:14:27 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 3216 invoked by uid 99); 7 Oct 2015 16:14:27 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Oct 2015 16:14:27 +0000 Date: Wed, 7 Oct 2015 16:14:27 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-2561) Sync Gelly Java and Scala APIs MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-2561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14947117#comment-14947117 ] ASF GitHub Bot commented on FLINK-2561: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1211#discussion_r41410709 --- Diff: flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.example; + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.types.NullValue +import org.apache.flink.graph.Edge +import org.apache.flink.api.common.functions.MapFunction +import scala.collection.JavaConversions._ +import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap +import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData +import org.apache.flink.graph.gsa.GatherFunction +import org.apache.flink.graph.gsa.Neighbor +import org.apache.flink.graph.gsa.SumFunction +import org.apache.flink.graph.gsa.ApplyFunction + +/** + * This example shows how to use Gelly's gather-sum-apply iterations. + * + * It is an implementation of the Single-Source-Shortest-Paths algorithm. + * + * The input file is a plain text file and must be formatted as follows: + * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: 1\t2\t0.1\n1\t3\t1.4\n defines two edges, + * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. + * + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]] + */ +object GSASingleSourceShortestPaths { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env) + val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) + + // Execute the gather-sum-apply iteration + val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance, + new UpdateDistance, maxIterations) + + // Extract the vertices as the result + val singleSourceShortestPaths = result.getVertices + + // emit result + if (fileOutput) { + singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",") + env.execute("GSA Single Source Shortest Paths Example") + } else { + singleSourceShortestPaths.print() + } + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path UDFs + // -------------------------------------------------------------------------------------------- + + private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] { + + override def map(id: Long) = { + if (id.equals(srcId)) { + 0.0 + } else { + Double.PositiveInfinity + } + } + } + + private final class CalculateDistances extends GatherFunction[Double, Double, Double] { + override def gather(neighbor: Neighbor[Double, Double]) = { + neighbor.getNeighborValue + neighbor.getEdgeValue + } + } + + private final class ChooseMinDistance extends SumFunction[Double, Double, Double] { + override def sum(newValue: Double, currentValue: Double) = { + Math.min(newValue, currentValue) + } + } + + private final class UpdateDistance extends ApplyFunction[Long, Double, Double] { + override def apply(newDistance: Double, oldDistance: Double) = { + if (newDistance < oldDistance) { + setResult(newDistance) + } + } + } + + // ************************************************************************** + // UTIL METHODS + // ************************************************************************** + + private var fileOutput = false + private var srcVertexId = 1L + private var edgesInputPath: String = null + private var outputPath: String = null + private var maxIterations = 5 + + private def parseParameters(args: Array[String]): Boolean = { + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage: SingleSourceShortestPaths " + + " ") + false + } + fileOutput = true + srcVertexId = args(0).toLong + edgesInputPath = args(1) + outputPath = args(2) + maxIterations = (3).toInt + } else { + System.out.println("Executing Single Source Shortest Paths example " + + "with default parameters and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println("Usage: SingleSourceShortestPaths " + + " "); + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long, Double)](edgesInputPath, + lineDelimiter = "\n", + fieldDelimiter = "\t") + .map(new Tuple3ToEdgeMap[Long, Double]()) + } else { --- End diff -- off by 1 space? > Sync Gelly Java and Scala APIs > ------------------------------ > > Key: FLINK-2561 > URL: https://issues.apache.org/jira/browse/FLINK-2561 > Project: Flink > Issue Type: Task > Components: Gelly > Reporter: Vasia Kalavri > Assignee: Vasia Kalavri > Fix For: 0.10 > > > There is some functionality and tests missing from the Gelly Scala API. This should be added, together with documentation, a completeness test and some usage examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)