flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/2] flink git commit: [FLINK-5909] [gelly] Interface for GraphAlgorithm results
Date Thu, 09 Mar 2017 15:11:25 GMT
Repository: flink
Updated Branches:
  refs/heads/pr3434 [created] 33cd97953


http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index a7ba00a..7d77541 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -25,16 +25,17 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.BinaryResult;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
@@ -240,7 +241,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("0; 1")
+	@ForwardedFields("0; 1")
 	private static class VertexInverseLogDegree<T>
 	implements MapFunction<Vertex<T, LongValue>, Tuple3<T, LongValue, FloatValue>>
{
 		private Tuple3<T, LongValue, FloatValue> output = new Tuple3<>(null, null,
new FloatValue());
@@ -260,13 +261,12 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 		}
 	}
 
-
 	/**
 	 * @see JaccardIndex.GenerateGroupSpans
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("0->1; 1->2 ; 2->3")
+	@ForwardedFields("0->1; 1->2 ; 2->3")
 	private static class GenerateGroupSpans<T>
 	implements GroupReduceFunction<Tuple3<T, T, FloatValue>, Tuple4<IntValue, T,
T, FloatValue>> {
 		private IntValue groupSpansValue = new IntValue();
@@ -301,7 +301,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("1; 2; 3")
+	@ForwardedFields("1; 2; 3")
 	private static class GenerateGroups<T>
 	implements FlatMapFunction<Tuple4<IntValue, T, T, FloatValue>, Tuple4<IntValue,
T, T, FloatValue>> {
 		@Override
@@ -321,7 +321,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("3->2")
+	@ForwardedFields("3->2")
 	private static class GenerateGroupPairs<T extends CopyableValue<T>>
 	implements GroupReduceFunction<Tuple4<IntValue, T, T, FloatValue>, Tuple3<T,
T, FloatValue>> {
 		private Tuple3<T, T, FloatValue> output = new Tuple3<>();
@@ -392,7 +392,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("0; 1")
+	@ForwardedFields("0; 1")
 	private static class ComputeScores<T>
 	extends RichGroupReduceFunction<Tuple3<T, T, FloatValue>, Result<T>> {
 		private float minimumScore;
@@ -442,12 +442,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	}
 
 	/**
-	 * Wraps the vertex type to encapsulate results from the Adamic-Adar algorithm.
+	 * Wraps {@link Tuple3} to encapsulate results from the Adamic-Adar algorithm.
 	 *
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Edge<T, FloatValue> {
+	extends Tuple3<T, T, FloatValue>
+	implements PrintableResult, BinaryResult<T>, Comparable<Result<T>> {
 		public static final int HASH_SEED = 0xe405f6d1;
 
 		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
@@ -459,6 +460,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 			f2 = new FloatValue();
 		}
 
+		@Override
+		public T getVertexId0() {
+			return f0;
+		}
+
+		@Override
+		public T getVertexId1() {
+			return f1;
+		}
+
 		/**
 		 * Get the Adamic-Adar score, equal to the sum over common neighbors of
 		 * the inverse logarithm of degree
@@ -469,8 +480,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 			return f2;
 		}
 
-		public String toVerboseString() {
-			return "Vertex IDs: (" + f0 + ", " + f1
+		@Override
+		public String toPrintableString() {
+			return "Vertex IDs: (" + getVertexId0() + ", " + getVertexId1()
 				+ "), adamic-adar score: " + getAdamicAdarScore();
 		}
 
@@ -482,5 +494,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 				.hash(f2.getValue())
 				.hash();
 		}
+
+		@Override
+		public int compareTo(Result<T> o) {
+			return Float.compare(getAdamicAdarScore().getValue(), o.getAdamicAdarScore().getValue());
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 11ec73d..3b36715 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -22,13 +22,15 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.BinaryResult;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
@@ -252,7 +254,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("0->1; 1->2")
+	@ForwardedFields("0->1; 1->2")
 	private static class GenerateGroupSpans<T, ET>
 	implements GroupReduceFunction<Edge<T, Tuple2<ET, LongValue>>, Tuple4<IntValue,
T, T, IntValue>> {
 		private final int groupSize;
@@ -301,7 +303,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	 *
 	 * @see GenerateGroupSpans
 	 */
-	@FunctionAnnotation.ForwardedFields("1; 2; 3")
+	@ForwardedFields("1; 2; 3")
 	private static class GenerateGroups<T>
 	implements FlatMapFunction<Tuple4<IntValue, T, T, IntValue>, Tuple4<IntValue,
T, T, IntValue>> {
 		@Override
@@ -397,7 +399,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("0; 1")
+	@ForwardedFields("0; 1")
 	private static class ComputeScores<T>
 	implements GroupReduceFunction<Tuple3<T, T, IntValue>, Result<T>> {
 		private boolean unboundedScores;
@@ -440,8 +442,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 						&& count * maximumScoreDenominator < distinctNeighbors * maximumScoreNumerator))
{
 				output.f0 = edge.f0;
 				output.f1 = edge.f1;
-				output.f2.f0.setValue(count);
-				output.f2.f1.setValue(distinctNeighbors);
+				output.f2.setValue(count);
+				output.f3.setValue(distinctNeighbors);
 				out.collect(output);
 			}
 		}
@@ -453,13 +455,25 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Edge<T, Tuple2<IntValue, IntValue>> {
+	extends Tuple4<T, T, IntValue, IntValue>
+	implements PrintableResult, BinaryResult<T>, Comparable<Result<T>> {
 		public static final int HASH_SEED = 0x731f73e7;
 
 		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
 
 		public Result() {
-			f2 = new Tuple2<>(new IntValue(), new IntValue());
+			f2 = new IntValue();
+			f3 = new IntValue();
+		}
+
+		@Override
+		public T getVertexId0() {
+			return f0;
+		}
+
+		@Override
+		public T getVertexId1() {
+			return f1;
 		}
 
 		/**
@@ -468,7 +482,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 		 * @return shared neighbor count
 		 */
 		public IntValue getSharedNeighborCount() {
-			return f2.f0;
+			return f2;
 		}
 
 		/**
@@ -477,7 +491,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 		 * @return distinct neighbor count
 		 */
 		public IntValue getDistinctNeighborCount() {
-			return f2.f1;
+			return f3;
 		}
 
 		/**
@@ -491,8 +505,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 			return getSharedNeighborCount().getValue() / (double) getDistinctNeighborCount().getValue();
 		}
 
-		public String toVerboseString() {
-			return "Vertex IDs: (" + f0 + ", " + f1
+		public String toPrintableString() {
+			return "Vertex IDs: (" + getVertexId0() + ", " + getVertexId1()
 				+ "), number of shared neighbors: " + getSharedNeighborCount()
 				+ ", number of distinct neighbors: " + getDistinctNeighborCount()
 				+ ", jaccard index score: " + getJaccardIndexScore();
@@ -503,9 +517,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
 			return hasher.reset()
 				.hash(f0.hashCode())
 				.hash(f1.hashCode())
-				.hash(f2.f0.getValue())
-				.hash(f2.f1.getValue())
+				.hash(f2.getValue())
+				.hash(f3.getValue())
 				.hash();
 		}
+
+		@Override
+		public int compareTo(Result<T> o) {
+			// exact comparison of a/b with x/y using only integer math:
+			// a/b <?> x/y == a*y <?> b*x
+
+			long ay = getSharedNeighborCount().getValue() * (long)o.getDistinctNeighborCount().getValue();
+			long bx = getDistinctNeighborCount().getValue() * (long)o.getSharedNeighborCount().getValue();
+
+			return Long.compare(ay, bx);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 0c93fa9..5b9e18f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -38,7 +38,9 @@ public class GraphUtils {
 		return input
 			.map(new MapTo<T, LongValue>(new LongValue(1)))
 				.returns(LONG_VALUE_TYPE_INFO)
-			.reduce(new AddLongValue());
+				.name("Emit 1")
+			.reduce(new AddLongValue())
+				.name("Sum");
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
index 77d9dba..9aaa062 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
@@ -40,12 +40,12 @@ extends AsmTestBase {
 	public void testSimpleGraph()
 			throws Exception {
 		String expectedResult =
-			"(0,(2,1))\n" +
-			"(1,(3,2))\n" +
-			"(2,(3,2))\n" +
-			"(3,(4,1))\n" +
-			"(4,(1,0))\n" +
-			"(5,(1,0))";
+			"(0,2,1)\n" +
+			"(1,3,2)\n" +
+			"(2,3,2)\n" +
+			"(3,4,1)\n" +
+			"(4,1,0)\n" +
+			"(5,1,0)";
 
 		DataSet<Result<IntValue>> cc = directedSimpleGraph
 			.run(new LocalClusteringCoefficient<IntValue, NullValue, NullValue>());

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
index ba0834c..e9097dc 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
@@ -41,12 +41,12 @@ extends AsmTestBase {
 	public void testSimpleGraph()
 			throws Exception {
 		String expectedResult =
-			"(0,(2,1))\n" +
-			"(1,(3,2))\n" +
-			"(2,(3,2))\n" +
-			"(3,(4,1))\n" +
-			"(4,(1,0))\n" +
-			"(5,(1,0))";
+			"(0,2,1)\n" +
+			"(1,3,2)\n" +
+			"(2,3,2)\n" +
+			"(3,4,1)\n" +
+			"(4,1,0)\n" +
+			"(5,1,0)";
 
 		DataSet<Result<IntValue>> cc = undirectedSimpleGraph
 			.run(new LocalClusteringCoefficient<IntValue, NullValue, NullValue>());

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
index bc3914e..dee066c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.asm.AsmTestBase;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.library.clustering.undirected.TriangleListing.Result;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -38,7 +38,7 @@ extends AsmTestBase {
 	@Test
 	public void testSimpleGraph()
 			throws Exception {
-		DataSet<Tuple3<IntValue, IntValue, IntValue>> tl = undirectedSimpleGraph
+		DataSet<Result<IntValue>> tl = undirectedSimpleGraph
 			.run(new TriangleListing<IntValue, NullValue, NullValue>()
 				.setSortTriangleVertices(true));
 
@@ -55,10 +55,10 @@ extends AsmTestBase {
 		long expectedDegree = completeGraphVertexCount - 1;
 		long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree,
2) / 3;
 
-		DataSet<Tuple3<LongValue, LongValue, LongValue>> tl = completeGraph
+		DataSet<Result<LongValue>> tl = completeGraph
 			.run(new TriangleListing<LongValue, NullValue, NullValue>());
 
-		Checksum checksum = new ChecksumHashCode<Tuple3<LongValue, LongValue, LongValue>>()
+		Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
 			.run(tl)
 			.execute();
 
@@ -68,11 +68,11 @@ extends AsmTestBase {
 	@Test
 	public void testRMatGraph()
 			throws Exception {
-		DataSet<Tuple3<LongValue, LongValue, LongValue>> tl = undirectedRMatGraph
+		DataSet<Result<LongValue>> tl = undirectedRMatGraph
 			.run(new TriangleListing<LongValue, NullValue, NullValue>()
 				.setSortTriangleVertices(true));
 
-		Checksum checksum = new ChecksumHashCode<Tuple3<LongValue, LongValue, LongValue>>()
+		Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
 			.run(tl)
 			.execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 9490459..d69a441 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -47,17 +47,17 @@ extends AsmTestBase {
 			.run(new JaccardIndex<IntValue, NullValue, NullValue>());
 
 		String expectedResult =
-			"(0,1,(1,4))\n" +
-			"(0,2,(1,4))\n" +
-			"(0,3,(2,4))\n" +
-			"(1,2,(2,4))\n" +
-			"(1,3,(1,6))\n" +
-			"(1,4,(1,3))\n" +
-			"(1,5,(1,3))\n" +
-			"(2,3,(1,6))\n" +
-			"(2,4,(1,3))\n" +
-			"(2,5,(1,3))\n" +
-			"(4,5,(1,1))\n";
+			"(0,1,1,4)\n" +
+			"(0,2,1,4)\n" +
+			"(0,3,2,4)\n" +
+			"(1,2,2,4)\n" +
+			"(1,3,1,6)\n" +
+			"(1,4,1,3)\n" +
+			"(1,5,1,3)\n" +
+			"(2,3,1,6)\n" +
+			"(2,4,1,3)\n" +
+			"(2,5,1,3)\n" +
+			"(4,5,1,1)\n";
 
 		TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
 	}
@@ -70,9 +70,9 @@ extends AsmTestBase {
 				.setMinimumScore(1, 2));
 
 		String expectedResult =
-			"(0,3,(2,4))\n" +
-			"(1,2,(2,4))\n" +
-			"(4,5,(1,1))\n";
+			"(0,3,2,4)\n" +
+			"(1,2,2,4)\n" +
+			"(4,5,1,1)\n";
 
 		TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
 	}
@@ -85,14 +85,14 @@ extends AsmTestBase {
 				.setMaximumScore(1, 2));
 
 		String expectedResult =
-			"(0,1,(1,4))\n" +
-			"(0,2,(1,4))\n" +
-			"(1,3,(1,6))\n" +
-			"(1,4,(1,3))\n" +
-			"(1,5,(1,3))\n" +
-			"(2,3,(1,6))\n" +
-			"(2,4,(1,3))\n" +
-			"(2,5,(1,3))\n";
+			"(0,1,1,4)\n" +
+			"(0,2,1,4)\n" +
+			"(1,3,1,6)\n" +
+			"(1,4,1,3)\n" +
+			"(1,5,1,3)\n" +
+			"(2,3,1,6)\n" +
+			"(2,4,1,3)\n" +
+			"(2,5,1,3)\n";
 
 		TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
index 280eb92..3e375b5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
@@ -54,7 +54,6 @@ public class DegreesITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
 		List<Tuple2<Long, LongValue>> result = data.collect();
 
-
 		expectedResult = "1,2\n" +
 			"2,1\n" +
 			"3,2\n" +
@@ -74,7 +73,6 @@ public class DegreesITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 			TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
 
-
 		DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
 		List<Tuple2<Long, LongValue>> result = data.collect();
 


Mime
View raw message