flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [4/5] flink git commit: [FLINK-1758] [gelly] Replaced groupReduce with reduce
Date Sun, 26 Apr 2015 12:30:45 GMT
[FLINK-1758] [gelly] Replaced groupReduce with reduce


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d015bb0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d015bb0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d015bb0f

Branch: refs/heads/master
Commit: d015bb0f6e9b6c8764cb34dc8085751596b8a901
Parents: 9de640a
Author: andralungu <lungu.andra@gmail.com>
Authored: Mon Apr 13 10:26:55 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Sun Apr 26 14:05:38 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        | 45 ++++++++---
 .../org/apache/flink/graph/EdgeDirection.java   |  6 +-
 .../main/java/org/apache/flink/graph/Graph.java | 12 +--
 .../apache/flink/graph/ReduceEdgesFunction.java |  3 +-
 .../flink/graph/ReduceNeighborsFunction.java    |  2 +-
 .../JaccardSimilarityMeasureExample.java        |  8 +-
 .../operations/ReduceOnEdgesMethodsITCase.java  | 78 ++++++--------------
 .../ReduceOnNeighborMethodsITCase.java          | 23 +++---
 8 files changed, 92 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d015bb0f/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 1a02e6d..6390d6f 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -266,10 +266,13 @@ Neighborhood Methods
 
 Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood.
 
-`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of
a vertex, while `groupReduceOnNeighbors()` has access on both the neighboring edges and vertices.
The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values
`IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT`
will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors).
+`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of
a vertex,
+while `groupReduceOnNeighbors()` has access to both the neighboring edges and vertices. The
neighborhood scope
+is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`.
`IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going
edges (neighbors), while `ALL` will gather all edges (neighbors).
 
 The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more
values per vertex.
-When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should
be called as they are more efficient.
+
+When the user-defined function to be applied on the neighborhood is associative and commutative,
it is highly advised to use the `reduceOnEdges()` and `reduceOnNeighbors()` methods. These
methods exploit combiners internally, significantly improving performance.
 
 For example, assume that you want to select the minimum weight of all out-edges for each
vertex in the following graph:
 
@@ -286,22 +289,20 @@ DataSet<Tuple2<Long, Double>> minWeights = graph.groupReduceOnEdges(
 				new SelectMinWeight(), EdgeDirection.OUT);
 
 // user-defined function to select the minimum weight
-static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long,
Long, Long, Tuple2<Long, Long>> {
+static final class SelectMinWeight implements EdgesFunctionWithVertexValue<Long, Long,
Long, Tuple2<Long, Long>> {
 
 		@Override
 		public void iterateEdges(Vertex<Long, Long> v,
 				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>>
out) throws Exception {
 
-			long weight = Long.MAX_VALUE;
-			long minNeighborId = 0;
+			long minWeight = Long.MAX_VALUE;
 
 			for (Edge<Long, Long> edge: edges) {
-				if (edge.getValue() < weight) {
-					weight = edge.getValue();
-					minNeighborId = edge.getTarget();
+				if (edge.getValue() < minWeight) {
+					minWeight = edge.getValue();
 				}
 			}
-			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
+			out.collect(new Tuple2<Long, Long>(v.getId(), minWeight));
 		}
 	}
 {% endhighlight %}
@@ -335,6 +336,32 @@ static final class SumValues implements ReduceNeighborsFunction<Long,
Long, Doub
     <img alt="reduceOnNeighbors Example" width="70%" src="img/gelly-reduceOnNeighbors.png"/>
 </p>
 
+The following code will collect the in-edges for each vertex and apply the `SumInNeighbors()`
user-defined function on each of the resulting neighborhoods:
+
+{% highlight java %}
+Graph<Long, Long, Double> graph = ...
+
+DataSet<Tuple2<Long, Long>> verticesWithSum =
+				graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
+
+// user-defined function to sum up the in-neighbor values.
+static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long,
Long,
+	Tuple2<Long, Long>> {
+
+	@Override
+	public void iterateNeighbors(Vertex<Long, Long> vertex,
+			Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+			Collector<Tuple2<Long, Long>> out) throws Exception {
+
+		long sum = 0;
+		for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors)
{
+			sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+		}
+		out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+	}
+}
+{% endhighlight %}
+
 When the aggregation computation does not require access to the vertex value (for which the
aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction`
for the user-defined functions. When access to the vertex value is required, one should use
`EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead.
 
 [Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/d015bb0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
index 65d4098..0a055bb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
@@ -22,8 +22,10 @@ package org.apache.flink.graph;
  * The EdgeDirection is used to select a node's neighborhood
  * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)},
  * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
- * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} and
- * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)},
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)},
+ * {@link Graph#reduceOnEdges(ReduceEdgesFunction, EdgeDirection)} and
+ * {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)}
  * methods.
  */
 public enum EdgeDirection {

http://git-wip-us.apache.org/repos/asf/flink/blob/d015bb0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 1325e0c..48d39b1 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -724,7 +724,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @throws IllegalArgumentException
 	 */
 	public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K,
VV, EV, T> edgesFunction,
-											 EdgeDirection direction) throws IllegalArgumentException {
+											EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 		case IN:
@@ -755,7 +755,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @throws IllegalArgumentException
 	 */
 	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
-											 EdgeDirection direction) throws IllegalArgumentException {
+											EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 		case IN:
@@ -1240,7 +1240,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @throws IllegalArgumentException
 	 */
 	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K,
VV, EV, T> neighborsFunction,
-												 EdgeDirection direction) throws IllegalArgumentException {
+												EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 		case IN:
 			// create <edge-sourceVertex> pairs
@@ -1283,7 +1283,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @throws IllegalArgumentException
 	 */
 	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV,
T> neighborsFunction,
-												 EdgeDirection direction) throws IllegalArgumentException {
+												EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 		case IN:
 			// create <edge-sourceVertex> pairs
@@ -1437,7 +1437,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @throws IllegalArgumentException
 	 */
 	public DataSet reduceOnNeighbors(ReduceNeighborsFunction<K, VV, EV> reduceNeighborsFunction,
-									 EdgeDirection direction) throws IllegalArgumentException {
+									EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 			case IN:
 				// create <edge-sourceVertex> pairs
@@ -1504,7 +1504,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	 * @throws IllegalArgumentException
 	 */
 	public DataSet reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
-								 EdgeDirection direction) throws IllegalArgumentException {
+								EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 			case IN:

http://git-wip-us.apache.org/repos/asf/flink/blob/d015bb0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
index 0b5d2cf..53c7934 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.graph;
 
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 import java.io.Serializable;
@@ -30,7 +31,7 @@ import java.io.Serializable;
  * @param <EV> the edge value type
  */
 public interface ReduceEdgesFunction<K extends Comparable<K> & Serializable,
-		EV extends Serializable> {
+		EV extends Serializable> extends Function, Serializable {
 
 	Tuple2<K, Edge<K, EV>> reduceEdges(Tuple2<K, Edge<K, EV>> firstEdge,
Tuple2<K, Edge<K, EV>> secondEdge);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d015bb0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
index 50c0d35..f5e978f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -36,5 +36,5 @@ public interface ReduceNeighborsFunction <K extends Comparable<K>
& Serializable
 		EV extends Serializable> extends Function, Serializable {
 
 	Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduceNeighbors(Tuple3<K, Edge<K,
EV>, Vertex<K, VV>> firstNeighbor,
-														  Tuple3<K, Edge<K, EV>, Vertex<K, VV>> secondNeighbor);
+														Tuple3<K, Edge<K, EV>, Vertex<K, VV>> secondNeighbor);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d015bb0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
index c81aeb3..2783a29 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
@@ -32,6 +32,7 @@ import org.apache.flink.graph.EdgesFunction;
 import org.apache.flink.graph.Triplet;
 import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
 
 import java.util.HashSet;
 
@@ -68,7 +69,7 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription
{
 		Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
 
 		DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
-				graph.reduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
+				graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
 
 		Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors,
edges, env);
 
@@ -106,7 +107,8 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription
{
 	private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long,
HashSet<Long>>> {
 
 		@Override
-		public Vertex<Long, HashSet<Long>> iterateEdges(Iterable<Tuple2<Long,
Edge<Long, Double>>> edges) throws Exception {
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
+														Collector<Vertex<Long, HashSet<Long>>> out) throws Exception
{
 
 			HashSet<Long> neighborsHashSet = new HashSet<Long>();
 			long vertexId = -1;
@@ -115,7 +117,7 @@ public class JaccardSimilarityMeasureExample implements ProgramDescription
{
 				neighborsHashSet.add(getNeighborID(edge));
 				vertexId = edge.f0;
 			}
-			return new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet);
+			out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d015bb0f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
index 2452cba..3ace49a 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -78,9 +78,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase
{
 		env.execute();
 	
 		expectedResult = "1,2\n" +
-				"2,3\n" + 
+				"2,3\n" +
 				"3,4\n" +
-				"4,5\n" + 
+				"4,5\n" +
 				"5,1\n";
 	}
 
@@ -335,8 +335,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase
{
 	@Test
 	public void testLowestWeightOutNeighborNoValue() throws Exception {
 		/*
-		 * Get the lowest-weight out-neighbor
-		 * for each vertex
+		 * Get the lowest-weight out of all the out-neighbors
+		 * of each vertex
          */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),

@@ -347,33 +347,33 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase
{
 		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
 		env.execute();
 
-		expectedResult = "1,2\n" +
-				"2,3\n" + 
-				"3,4\n" +
-				"4,5\n" + 
-				"5,1\n";
+		expectedResult = "1,12\n" +
+				"2,23\n" +
+				"3,34\n" +
+				"4,45\n" +
+				"5,51\n";
 	}
 
 	@Test
 	public void testLowestWeightInNeighborNoValue() throws Exception {
 		/*
-		 * Get the lowest-weight in-neighbor
-		 * for each vertex
+		 * Get the lowest-weight out of all the in-neighbors
+		 * of each vertex
          */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),

 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.groupReduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
+				graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
 		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
 		env.execute();
 
-		expectedResult = "1,5\n" +
-				"2,1\n" + 
-				"3,1\n" +
-				"4,3\n" + 
-				"5,3\n";
+		expectedResult = "1,51\n" +
+				"2,12\n" +
+				"3,13\n" +
+				"4,34\n" +
+				"5,35\n";
 	}
 
 	@Test
@@ -455,25 +455,6 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase
{
 	@SuppressWarnings("serial")
 	private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction<Long,
Long> {
 
-//		@Override
-//		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
-//								 Collector<Tuple2<Long, Long>> out) throws Exception {
-//
-//			long weight = Long.MIN_VALUE;
-//			long vertexId = -1;
-//			long i=0;
-//
-//			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-//				if (edge.f1.getValue() > weight) {
-//					weight = edge.f1.getValue();
-//				}
-//				if (i==0) {
-//					vertexId = edge.f0;
-//				} i++;
-//			}
-//			out.collect(new Tuple2<Long, Long>(vertexId, weight));
-//		}
-
 		@Override
 		public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long,
Long>> firstEdge,
 														  Tuple2<Long, Edge<Long, Long>> secondEdge) {
@@ -506,27 +487,16 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long,
Long, Tuple2<Long, Long>> {
+	private static final class SelectMinWeightInNeighborNoValue implements ReduceEdgesFunction<Long,
Long> {
 
 		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-			
-			long weight = Long.MAX_VALUE;
-			long minNeighorId = 0;
-			long vertexId = -1;
-			long i=0;
-
-			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-				if (edge.f1.getValue() < weight) {
-					weight = edge.f1.getValue();
-					minNeighorId = edge.f1.getSource();
-				}
-				if (i==0) {
-					vertexId = edge.f0;
-				} i++;
+		public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long,
Long>> firstEdge,
+														  Tuple2<Long, Edge<Long, Long>> secondEdge) {
+			if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) {
+				return firstEdge;
+			} else {
+				return secondEdge;
 			}
-			out.collect(new Tuple2<Long, Long>(vertexId, minNeighorId));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d015bb0f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
index 5300d24..5f23569 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.graph.test.operations;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -227,7 +226,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase
{
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithSum = 
-				graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
+				graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
 		verticesWithSum.writeAsCsv(resultPath);
 		env.execute();
 	
@@ -536,15 +535,21 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase
{
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SumInNeighborsNoValue implements ReduceNeighborsFunction<Long,
Long, Long> {
+	private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long,
Long,
+			Tuple2<Long, Long>> {
 
 		@Override
-		public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long,
Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
-																				  Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>
secondNeighbor) {
-			long sum = firstNeighbor.f2.getValue() * firstNeighbor.f1.getValue() +
-					secondNeighbor.f2.getValue() * secondNeighbor.f1.getValue();
-			return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0,
firstNeighbor.f1,
-					new Vertex<Long, Long>(firstNeighbor.f0, sum));
+		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long,
Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator
=
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue() * next.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(next.f0, sum));
 		}
 	}
 


Mime
View raw message