Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7C318200B2B for ; Tue, 28 Jun 2016 21:44:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7959B160A56; Tue, 28 Jun 2016 19:44:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CDF59160A28 for ; Tue, 28 Jun 2016 21:44:41 +0200 (CEST) Received: (qmail 14886 invoked by uid 500); 28 Jun 2016 19:44:41 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 14877 invoked by uid 99); 28 Jun 2016 19:44:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2016 19:44:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D3E2FE08FE; Tue, 28 Jun 2016 19:44:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greg@apache.org To: commits@flink.apache.org Message-Id: <22442df0f0dd45f689cfdc1a3874fb44@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-3277] [gelly] Use Value types in Gelly API Date: Tue, 28 Jun 2016 19:44:37 +0000 (UTC) archived-at: Tue, 28 Jun 2016 19:44:43 -0000 Repository: flink Updated Branches: refs/heads/master 10898a90f -> 40749ddcd [FLINK-3277] [gelly] Use Value types in Gelly API This closes #1671 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40749ddc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40749ddc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40749ddc Branch: refs/heads/master Commit: 40749ddcd73c4634d81c2153f64e8934d519be3d Parents: 10898a9 Author: Greg Hogan Authored: Thu Feb 18 10:40:39 2016 -0500 Committer: Greg Hogan Committed: Tue Jun 28 15:04:16 2016 -0400 ---------------------------------------------------------------------- docs/apis/batch/libs/gelly.md | 24 +++---- .../flink/graph/examples/GraphMetrics.java | 15 +++-- .../graph/scala/examples/GraphMetrics.scala | 2 +- .../org/apache/flink/graph/scala/Graph.scala | 22 +++--- .../main/java/org/apache/flink/graph/Graph.java | 19 ++++-- .../apache/flink/graph/library/GSAPageRank.java | 10 +-- .../apache/flink/graph/library/PageRank.java | 10 +-- .../graph/spargel/ScatterGatherIteration.java | 70 ++++++++++---------- .../graph/generator/CompleteGraphTest.java | 8 +-- .../flink/graph/generator/CycleGraphTest.java | 8 +-- .../flink/graph/generator/EmptyGraphTest.java | 4 +- .../flink/graph/generator/GridGraphTest.java | 8 +-- .../graph/generator/HypercubeGraphTest.java | 8 +-- .../flink/graph/generator/PathGraphTest.java | 8 +-- .../graph/generator/SingletonEdgeGraphTest.java | 8 +-- .../flink/graph/generator/StarGraphTest.java | 8 +-- .../graph/test/operations/DegreesITCase.java | 25 +++---- .../operations/DegreesWithExceptionITCase.java | 11 +-- 18 files changed, 138 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/docs/apis/batch/libs/gelly.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index b4b78cc..d8692d6 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -346,13 +346,13 @@ DataSet getVertexIds() DataSet> getEdgeIds() // get a DataSet of pairs for all vertices -DataSet> inDegrees() +DataSet> inDegrees() // get a DataSet of pairs for all vertices -DataSet> outDegrees() +DataSet> outDegrees() // get a DataSet of pairs for all vertices, where degree is the sum of in- and out- degrees -DataSet> getDegrees() +DataSet> getDegrees() // get the number of vertices long numberOfVertices() @@ -381,13 +381,13 @@ getVertexIds: DataSet[K] getEdgeIds: DataSet[(K, K)] // get a DataSet of pairs for all vertices -inDegrees: DataSet[(K, Long)] +inDegrees: DataSet[(K, LongValue)] // get a DataSet of pairs for all vertices -outDegrees: DataSet[(K, Long)] +outDegrees: DataSet[(K, LongValue)] // get a DataSet of pairs for all vertices, where degree is the sum of in- and out- degrees -getDegrees: DataSet[(K, Long)] +getDegrees: DataSet[(K, LongValue)] // get the number of vertices numberOfVertices: Long @@ -519,13 +519,13 @@ Note that if the input dataset contains a key multiple times, all Gelly join met {% highlight java %} Graph network = ... -DataSet> vertexOutDegrees = network.outDegrees(); +DataSet> vertexOutDegrees = network.outDegrees(); // assign the transition probabilities as the edge weights Graph networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, - new VertexJoinFunction() { - public Double vertexJoin(Double vertexValue, Long inputValue) { - return vertexValue / inputValue; + new VertexJoinFunction() { + public Double vertexJoin(Double vertexValue, LongValue inputValue) { + return vertexValue / inputValue.getValue(); } }); {% endhighlight %} @@ -535,10 +535,10 @@ Graph networkWithWeights = network.joinWithEdgesOnSource(v {% highlight scala %} val network: Graph[Long, Double, Double] = ... -val vertexOutDegrees: DataSet[(Long, Long)] = network.outDegrees +val vertexOutDegrees: DataSet[(Long, LongValue)] = network.outDegrees // assign the transition probabilities as the edge weights -val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: Long) => v1 / v2) +val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: LongValue) => v1 / v2.getValue) {% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java index 9058538..e7b47bf 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java @@ -18,7 +18,6 @@ package org.apache.flink.graph.examples; -import org.apache.flink.graph.examples.utils.ExampleUtils; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; @@ -27,6 +26,8 @@ import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.examples.utils.ExampleUtils; +import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; /** @@ -66,7 +67,7 @@ public class GraphMetrics implements ProgramDescription { long numEdges = graph.numberOfEdges(); /** compute the average node degree **/ - DataSet> verticesWithDegrees = graph.getDegrees(); + DataSet> verticesWithDegrees = graph.getDegrees(); DataSet avgNodeDegree = verticesWithDegrees .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices)); @@ -96,7 +97,7 @@ public class GraphMetrics implements ProgramDescription { } @SuppressWarnings("serial") - private static final class AvgNodeDegreeMapper implements MapFunction, Double> { + private static final class AvgNodeDegreeMapper implements MapFunction, Double> { private long numberOfVertices; @@ -104,14 +105,14 @@ public class GraphMetrics implements ProgramDescription { this.numberOfVertices = numberOfVertices; } - public Double map(Tuple2 sumTuple) { - return (double) (sumTuple.f1 / numberOfVertices) ; + public Double map(Tuple2 sumTuple) { + return (double) (sumTuple.f1.getValue() / numberOfVertices) ; } } @SuppressWarnings("serial") - private static final class ProjectVertexId implements MapFunction, Long> { - public Long map(Tuple2 value) { return value.f0; } + private static final class ProjectVertexId implements MapFunction, Long> { + public Long map(Tuple2 value) { return value.f0; } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala index f9fa82d..ebf43d4 100644 --- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala +++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala @@ -61,7 +61,7 @@ object GraphMetrics { /** compute the average node degree **/ val verticesWithDegrees = graph.getDegrees - val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble) + val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / numVertices).toDouble) /** find the vertex with the maximum in-degree **/ val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1) http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index 165f6c2..f7e13ba 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -18,24 +18,22 @@ package org.apache.flink.graph.scala -import org.apache.flink.util.Preconditions import org.apache.flink.api.common.functions.{FilterFunction, MapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{tuple => jtuple} import org.apache.flink.api.scala._ import org.apache.flink.graph._ import org.apache.flink.graph.asm.translate.TranslateFunction -import org.apache.flink.graph.validation.GraphValidator import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction} +import org.apache.flink.graph.pregel.{ComputeFunction, MessageCombiner, VertexCentricConfiguration} import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction} +import org.apache.flink.graph.validation.GraphValidator +import org.apache.flink.types.{LongValue, NullValue} +import org.apache.flink.util.Preconditions import org.apache.flink.{graph => jg} import _root_.scala.collection.JavaConverters._ import _root_.scala.reflect.ClassTag -import org.apache.flink.types.NullValue -import org.apache.flink.graph.pregel.ComputeFunction -import org.apache.flink.graph.pregel.MessageCombiner -import org.apache.flink.graph.pregel.VertexCentricConfiguration object Graph { @@ -803,8 +801,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * * @return A DataSet of Tuple2 */ - def inDegrees(): DataSet[(K, Long)] = { - wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue())) + def inDegrees(): DataSet[(K, LongValue)] = { + wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) } /** @@ -812,8 +810,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * * @return A DataSet of Tuple2 */ - def outDegrees(): DataSet[(K, Long)] = { - wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue())) + def outDegrees(): DataSet[(K, LongValue)] = { + wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) } /** @@ -821,8 +819,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * * @return A DataSet of Tuple2 */ - def getDegrees(): DataSet[(K, Long)] = { - wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue())) + def getDegrees(): DataSet[(K, LongValue)] = { + wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 3dbb9c4..fe59283 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -61,6 +61,7 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap; import org.apache.flink.graph.utils.Tuple3ToEdgeMap; import org.apache.flink.graph.utils.VertexToTuple2Map; import org.apache.flink.graph.validation.GraphValidator; +import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; @@ -867,25 +868,31 @@ public class Graph { * * @return A DataSet of {@code Tuple2} */ - public DataSet> outDegrees() { + public DataSet> outDegrees() { return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup()); } private static final class CountNeighborsCoGroup - implements CoGroupFunction, Edge, Tuple2> { + implements CoGroupFunction, Edge, Tuple2> { + private LongValue degree = new LongValue(); + + private Tuple2 vertexDegree = new Tuple2<>(null, degree); + @SuppressWarnings("unused") public void coGroup(Iterable> vertex, Iterable> outEdges, - Collector> out) { + Collector> out) { long count = 0; for (Edge edge : outEdges) { count++; } + degree.setValue(count); Iterator> vertexIterator = vertex.iterator(); if(vertexIterator.hasNext()) { - out.collect(new Tuple2(vertexIterator.next().f0, count)); + vertexDegree.f0 = vertexIterator.next().f0; + out.collect(vertexDegree); } else { throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds"); } @@ -897,7 +904,7 @@ public class Graph { * * @return A DataSet of {@code Tuple2} */ - public DataSet> inDegrees() { + public DataSet> inDegrees() { return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup()); } @@ -907,7 +914,7 @@ public class Graph { * * @return A DataSet of {@code Tuple2} */ - public DataSet> getDegrees() { + public DataSet> getDegrees() { return outDegrees().union(inDegrees()).groupBy(0).sum(1); } http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java index 324f9c3..ef39395 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java @@ -29,6 +29,7 @@ import org.apache.flink.graph.gsa.GSAConfiguration; import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.Neighbor; import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.types.LongValue; /** * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration. @@ -56,8 +57,7 @@ public class GSAPageRank implements GraphAlgorithm> run(Graph network) throws Exception { - - DataSet> vertexOutDegrees = network.outDegrees(); + DataSet> vertexOutDegrees = network.outDegrees(); Graph networkWithWeights = network .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights()); @@ -114,10 +114,10 @@ public class GSAPageRank implements GraphAlgorithm { + private static final class InitWeights implements EdgeJoinFunction { - public Double edgeJoin(Double edgeValue, Long inputValue) { - return edgeValue / (double) inputValue; + public Double edgeJoin(Double edgeValue, LongValue inputValue) { + return edgeValue / (double) inputValue.getValue(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index f83b05b..2f1b03b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -29,6 +29,7 @@ import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.ScatterGatherConfiguration; import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.types.LongValue; /** * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration. @@ -56,8 +57,7 @@ public class PageRank implements GraphAlgorithm> run(Graph network) throws Exception { - - DataSet> vertexOutDegrees = network.outDegrees(); + DataSet> vertexOutDegrees = network.outDegrees(); Graph networkWithWeights = network .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights()); @@ -118,10 +118,10 @@ public class PageRank implements GraphAlgorithm { + private static final class InitWeights implements EdgeJoinFunction { - public Double edgeJoin(Double edgeValue, Long inputValue) { - return edgeValue / (double) inputValue; + public Double edgeJoin(Double edgeValue, LongValue inputValue) { + return edgeValue / (double) inputValue.getValue(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java index 165ef1e..fc5c210 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java @@ -309,28 +309,28 @@ public class ScatterGatherIteration } @SuppressWarnings("serial") - private static final class VertexUpdateUdfVVWithDegrees extends VertexUpdateUdf, Message> { + private static final class VertexUpdateUdfVVWithDegrees extends VertexUpdateUdf, Message> { - private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction, Message> vertexUpdateFunction, - TypeInformation>> resultType) { + private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction, Message> vertexUpdateFunction, + TypeInformation>> resultType) { super(vertexUpdateFunction, resultType); } @Override - public void coGroup(Iterable> messages, Iterable>> vertex, - Collector>> out) throws Exception { + public void coGroup(Iterable> messages, Iterable>> vertex, + Collector>> out) throws Exception { - final Iterator>> vertexIter = vertex.iterator(); + final Iterator>> vertexIter = vertex.iterator(); if (vertexIter.hasNext()) { - Vertex> vertexWithDegrees = vertexIter.next(); + Vertex> vertexWithDegrees = vertexIter.next(); @SuppressWarnings("unchecked") Iterator> downcastIter = (Iterator>) (Iterator) messages.iterator(); messageIter.setSource(downcastIter); - vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1); - vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2); + vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1.getValue()); + vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue()); vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out); vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter); @@ -420,7 +420,7 @@ public class ScatterGatherIteration @SuppressWarnings("serial") private static final class MessagingUdfWithEVsVVWithDegrees - extends MessagingUdfWithEdgeValues, VV, Message, EV> { + extends MessagingUdfWithEdgeValues, VV, Message, EV> { private Vertex nextVertex = new Vertex(); @@ -430,19 +430,19 @@ public class ScatterGatherIteration } @Override - public void coGroup(Iterable> edges, Iterable>> state, + public void coGroup(Iterable> edges, Iterable>> state, Collector> out) throws Exception { - final Iterator>> stateIter = state.iterator(); + final Iterator>> stateIter = state.iterator(); if (stateIter.hasNext()) { - Vertex> vertexWithDegrees = stateIter.next(); + Vertex> vertexWithDegrees = stateIter.next(); nextVertex.setField(vertexWithDegrees.f0, 0); nextVertex.setField(vertexWithDegrees.f1.f0, 1); - messagingFunction.setInDegree(vertexWithDegrees.f1.f1); - messagingFunction.setOutDegree(vertexWithDegrees.f1.f2); + messagingFunction.setInDegree(vertexWithDegrees.f1.f1.getValue()); + messagingFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue()); messagingFunction.set((Iterator) edges.iterator(), out, vertexWithDegrees.getId()); messagingFunction.sendMessages(nextVertex); @@ -505,13 +505,13 @@ public class ScatterGatherIteration * @return the messaging function */ private CoGroupOperator> buildMessagingFunctionVerticesWithDegrees( - DeltaIteration>, Vertex>> iteration, + DeltaIteration>, Vertex>> iteration, TypeInformation> messageTypeInfo, int whereArg, int equalToArg, DataSet numberOfVertices) { // build the messaging function (co group) CoGroupOperator> messages; - MessagingUdfWithEdgeValues, VV, Message, EV> messenger = + MessagingUdfWithEdgeValues, VV, Message, EV> messenger = new MessagingUdfWithEVsVVWithDegrees(messagingFunction, messageTypeInfo); messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg) @@ -626,34 +626,34 @@ public class ScatterGatherIteration this.updateFunction.setOptDegrees(this.configuration.isOptDegrees()); - DataSet> inDegrees = graph.inDegrees(); - DataSet> outDegrees = graph.outDegrees(); + DataSet> inDegrees = graph.inDegrees(); + DataSet> outDegrees = graph.outDegrees(); - DataSet> degrees = inDegrees.join(outDegrees).where(0).equalTo(0) - .with(new FlatJoinFunction, Tuple2, Tuple3>() { + DataSet> degrees = inDegrees.join(outDegrees).where(0).equalTo(0) + .with(new FlatJoinFunction, Tuple2, Tuple3>() { @Override - public void join(Tuple2 first, Tuple2 second, Collector> out) { - out.collect(new Tuple3(first.f0, first.f1, second.f1)); + public void join(Tuple2 first, Tuple2 second, Collector> out) { + out.collect(new Tuple3(first.f0, first.f1, second.f1)); } }).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1"); - DataSet>> verticesWithDegrees = initialVertices + DataSet>> verticesWithDegrees = initialVertices .join(degrees).where(0).equalTo(0) - .with(new FlatJoinFunction, Tuple3, Vertex>>() { + .with(new FlatJoinFunction, Tuple3, Vertex>>() { @Override - public void join(Vertex vertex, Tuple3 degrees, - Collector>> out) throws Exception { + public void join(Vertex vertex, Tuple3 degrees, + Collector>> out) throws Exception { - out.collect(new Vertex>(vertex.getId(), - new Tuple3(vertex.getValue(), degrees.f1, degrees.f2))); + out.collect(new Vertex>(vertex.getId(), + new Tuple3(vertex.getValue(), degrees.f1, degrees.f2))); } }).withForwardedFieldsFirst("f0"); // add type info - TypeInformation>> vertexTypes = verticesWithDegrees.getType(); + TypeInformation>> vertexTypes = verticesWithDegrees.getType(); - final DeltaIteration>, Vertex>> iteration = + final DeltaIteration>, Vertex>> iteration = verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0); setUpIteration(iteration); @@ -673,11 +673,11 @@ public class ScatterGatherIteration } @SuppressWarnings({ "unchecked", "rawtypes" }) - VertexUpdateUdf, Message> updateUdf = + VertexUpdateUdf, Message> updateUdf = new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes); // build the update function (co group) - CoGroupOperator>> updates = + CoGroupOperator>> updates = messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf); if (this.configuration != null && this.configuration.isOptNumVertices()) { @@ -687,9 +687,9 @@ public class ScatterGatherIteration configureUpdateFunction(updates); return iteration.closeWith(updates, updates).map( - new MapFunction>, Vertex>() { + new MapFunction>, Vertex>() { - public Vertex map(Vertex> vertex) { + public Vertex map(Vertex> vertex) { return new Vertex(vertex.getId(), vertex.getValue().f0); } }); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java index af47fdc..6c0e094 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java @@ -56,10 +56,10 @@ extends AbstractGraphTest { assertEquals(vertexCount, graph.numberOfVertices()); assertEquals(vertexCount*(vertexCount-1), graph.numberOfEdges()); - long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; - long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; - long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; - long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue(); + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue(); + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); assertEquals(vertexCount - 1, minInDegree); assertEquals(vertexCount - 1, minOutDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java index fb6799b..ec36aa7 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java @@ -55,10 +55,10 @@ extends AbstractGraphTest { assertEquals(vertexCount, graph.numberOfVertices()); assertEquals(2 * vertexCount, graph.numberOfEdges()); - long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; - long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; - long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; - long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue(); + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue(); + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); assertEquals(2, minInDegree); assertEquals(2, minOutDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java index bc1ef77..d4a524f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java @@ -54,8 +54,8 @@ extends AbstractGraphTest { assertEquals(vertexCount, graph.numberOfVertices()); assertEquals(0, graph.numberOfEdges()); - long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; - long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); assertEquals(0, maxInDegree); assertEquals(0, maxOutDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java index f3fa7db..9606d1a 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java @@ -63,10 +63,10 @@ extends AbstractGraphTest { assertEquals(2*3*5*7, graph.numberOfVertices()); assertEquals(7 * 2*3*5*7, graph.numberOfEdges()); - long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; - long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; - long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; - long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue(); + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue(); + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); assertEquals(7, minInDegree); assertEquals(7, minOutDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java index 12024be..d723ecb 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java @@ -57,10 +57,10 @@ extends AbstractGraphTest { assertEquals(1L << dimensions, graph.numberOfVertices()); assertEquals(dimensions * (1L << dimensions), graph.numberOfEdges()); - long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; - long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; - long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; - long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue(); + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue(); + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); assertEquals(dimensions, minInDegree); assertEquals(dimensions, minOutDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java index b8a409f..3c3ce8c 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java @@ -55,10 +55,10 @@ extends AbstractGraphTest { assertEquals(vertexCount, graph.numberOfVertices()); assertEquals(2 * (vertexCount - 1), graph.numberOfEdges()); - long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; - long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; - long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; - long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue(); + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue(); + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); assertEquals(1, minInDegree); assertEquals(1, minOutDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java index 3877717..44a4d99 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java @@ -56,10 +56,10 @@ extends AbstractGraphTest { assertEquals(2 * vertexPairCount, graph.numberOfVertices()); assertEquals(2 * vertexPairCount, graph.numberOfEdges()); - long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; - long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; - long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; - long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue(); + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue(); + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); assertEquals(1, minInDegree); assertEquals(1, minOutDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java index 2b090db..c656cfb 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java @@ -57,10 +57,10 @@ extends AbstractGraphTest { assertEquals(vertexCount, graph.numberOfVertices()); assertEquals(2 * (vertexCount - 1), graph.numberOfEdges()); - long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; - long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; - long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; - long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue(); + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue(); + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); assertEquals(1, minInDegree); assertEquals(1, minOutDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java index b2744f9..db2ca0d 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Graph; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; import org.junit.Test; import org.junit.runner.RunWith; @@ -50,8 +51,8 @@ public class DegreesITCase extends MultipleProgramsTestBase { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> data =graph.outDegrees(); - List> result= data.collect(); + DataSet> data = graph.outDegrees(); + List> result = data.collect(); expectedResult = "1,2\n" + @@ -76,8 +77,8 @@ public class DegreesITCase extends MultipleProgramsTestBase { - DataSet> data =graph.outDegrees(); - List> result= data.collect(); + DataSet> data = graph.outDegrees(); + List> result = data.collect(); expectedResult = "1,3\n" + "2,1\n" + @@ -99,8 +100,8 @@ public class DegreesITCase extends MultipleProgramsTestBase { TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> data =graph.inDegrees(); - List> result= data.collect(); + DataSet> data = graph.inDegrees(); + List> result = data.collect(); expectedResult = "1,1\n" + "2,1\n" + @@ -120,8 +121,8 @@ public class DegreesITCase extends MultipleProgramsTestBase { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); - DataSet> data =graph.inDegrees(); - List> result= data.collect(); + DataSet> data = graph.inDegrees(); + List> result = data.collect(); expectedResult = "1,0\n" + "2,1\n" + @@ -142,8 +143,8 @@ public class DegreesITCase extends MultipleProgramsTestBase { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> data =graph.getDegrees(); - List> result= data.collect(); + DataSet> data = graph.getDegrees(); + List> result = data.collect(); expectedResult = "1,3\n" + "2,2\n" + @@ -164,8 +165,8 @@ public class DegreesITCase extends MultipleProgramsTestBase { Graph graph = Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - DataSet> data =graph.outDegrees(); - List> result= data.collect(); + DataSet> data = graph.outDegrees(); + List> result = data.collect(); expectedResult = "1,2\n" + "2,1\n" + http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java index d79768f..551a97b 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.types.LongValue; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -81,7 +82,7 @@ public class DegreesWithExceptionITCase { TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env); try { - graph.outDegrees().output(new DiscardingOutputFormat>()); + graph.outDegrees().output(new DiscardingOutputFormat>()); env.execute(); fail("graph.outDegrees() did not fail."); @@ -105,7 +106,7 @@ public class DegreesWithExceptionITCase { TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env); try { - graph.inDegrees().output(new DiscardingOutputFormat>()); + graph.inDegrees().output(new DiscardingOutputFormat>()); env.execute(); fail("graph.inDegrees() did not fail."); @@ -129,7 +130,7 @@ public class DegreesWithExceptionITCase { TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env); try { - graph.getDegrees().output(new DiscardingOutputFormat>()); + graph.getDegrees().output(new DiscardingOutputFormat>()); env.execute(); fail("graph.getDegrees() did not fail."); @@ -153,7 +154,7 @@ public class DegreesWithExceptionITCase { TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env); try { - graph.getDegrees().output(new DiscardingOutputFormat>()); + graph.getDegrees().output(new DiscardingOutputFormat>()); env.execute(); fail("graph.getDegrees() did not fail."); @@ -177,7 +178,7 @@ public class DegreesWithExceptionITCase { TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env); try { - graph.getDegrees().output(new DiscardingOutputFormat>()); + graph.getDegrees().output(new DiscardingOutputFormat>()); env.execute(); fail("graph.getDegrees() did not fail.");