flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [2/2] flink git commit: [FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms
Date Fri, 22 Jul 2016 18:38:42 GMT
[FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms

This closes #2248


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2ef74ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2ef74ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2ef74ea

Branch: refs/heads/master
Commit: e2ef74ea5a854555f86aefbd8a6b1889ef188ff1
Parents: 54f02ec
Author: Greg Hogan <code@greghogan.com>
Authored: Thu Jul 14 09:39:02 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Fri Jul 22 14:07:19 2016 -0400

----------------------------------------------------------------------
 .../degree/annotate/directed/VertexDegrees.java | 24 ++++++++++++++------
 .../annotate/directed/VertexInDegree.java       |  2 ++
 .../annotate/directed/VertexOutDegree.java      |  2 ++
 .../annotate/undirected/VertexDegree.java       |  2 ++
 .../graph/generator/GraphGeneratorUtils.java    | 14 ++++--------
 .../directed/LocalClusteringCoefficient.java    |  2 ++
 .../undirected/LocalClusteringCoefficient.java  |  2 ++
 .../flink/graph/library/link_analysis/HITS.java |  6 +++++
 8 files changed, 38 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2ef74ea/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 363ad2e..9fef221 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -21,9 +21,9 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -122,7 +122,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>>
{
 				.setParallelism(parallelism)
 				.name("Emit and flip edge")
 			.groupBy(0, 1)
-				.reduce(new ReduceBitmask<K>())
+			.reduceGroup(new ReduceBitmask<K>())
 				.setParallelism(parallelism)
 				.name("Reduce bitmask");
 
@@ -177,13 +177,23 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>>
{
 	 *
 	 * @param <T> ID type
 	 */
-	private static class ReduceBitmask<T>
-	implements ReduceFunction<Tuple3<T, T, ByteValue>> {
+	@ForwardedFields("0; 1")
+	private static final class ReduceBitmask<T>
+	implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Tuple3<T, T, ByteValue>>
{
 		@Override
-		public Tuple3<T, T, ByteValue> reduce(Tuple3<T, T, ByteValue> left, Tuple3<T,
T, ByteValue> right)
+		public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Tuple3<T,
T, ByteValue>> out)
 				throws Exception {
-			left.f2.setValue((byte)(left.f2.getValue() | right.f2.getValue()));
-			return left;
+			Tuple3<T, T, ByteValue> output = null;
+
+			byte bitmask = 0;
+
+			for (Tuple3<T, T, ByteValue> value: values) {
+				output = value;
+				bitmask |= value.f2.getValue();
+			}
+
+			output.f2.setValue(bitmask);
+			out.collect(output);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ef74ea/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 75f2369..f7ac18b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -119,6 +120,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>>
{
 		DataSet<Vertex<K, LongValue>> targetDegree = targetIds
 			.groupBy(0)
 			.reduce(new DegreeCount<K>())
+				.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ef74ea/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index b0576f8..e235f6a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -119,6 +120,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>>
{
 		DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
 			.groupBy(0)
 			.reduce(new DegreeCount<K>())
+				.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ef74ea/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index ec72222..42f084d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.graph.Edge;
@@ -143,6 +144,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>>
{
 		DataSet<Vertex<K, LongValue>> degree = vertexIds
 			.groupBy(0)
 			.reduce(new DegreeCount<K>())
+				.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ef74ea/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
index a7b5ce9..01cb2d1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -50,12 +50,10 @@ public class GraphGeneratorUtils {
 				.setParallelism(parallelism)
 				.name("Vertex iterators");
 
-		DataSet<Vertex<LongValue,NullValue>> vertexSequence = vertexLabels
+		return vertexLabels
 			.map(new CreateVertex())
 				.setParallelism(parallelism)
 				.name("Vertex sequence");
-
-		return vertexSequence;
 	}
 
 	@ForwardedFields("*->f0")
@@ -73,7 +71,7 @@ public class GraphGeneratorUtils {
 		}
 	}
 
-	/**************************************************************************/
+	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * Generates {@link Vertex Vertices} present in the given set of {@link Edge}s.
@@ -84,7 +82,7 @@ public class GraphGeneratorUtils {
 	 * @param <EV> edge value type
 	 * @return {@link DataSet} of discovered {@link Vertex Vertices}
 	 *
-	 * @see {@link Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)}
+	 * @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)
 	 */
 	public static <K,EV> DataSet<Vertex<K,NullValue>> vertexSet(DataSet<Edge<K,EV>>
edges, int parallelism) {
 		DataSet<Vertex<K,NullValue>> vertexSet = edges
@@ -92,16 +90,14 @@ public class GraphGeneratorUtils {
 				.setParallelism(parallelism)
 				.name("Emit source and target labels");
 
-		DataSet<Vertex<K,NullValue>> distinctVertexSet = vertexSet
+		return vertexSet
 			.distinct()
 				.setParallelism(parallelism)
 				.name("Emit vertex labels");
-
-		return distinctVertexSet;
 	}
 
 	/**
-	 * @see {@link Graph.EmitSrcAndTarget}
+	 * @see Graph.EmitSrcAndTarget
 	 */
 	private static final class EmitSrcAndTarget<K,EV>
 	implements FlatMapFunction<Edge<K,EV>, Vertex<K,NullValue>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ef74ea/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 537ad0f..e0defcd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -21,6 +21,7 @@ package org.apache.flink.graph.library.clustering.directed;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -121,6 +122,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>>
{
 		DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
 			.groupBy(0)
 			.reduce(new CountTriangles<K>())
+				.setCombineHint(CombineHint.HASH)
 				.name("Count triangles");
 
 		// u, deg(u)

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ef74ea/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 8f707fd..cd859d9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -21,6 +21,7 @@ package org.apache.flink.graph.library.clustering.undirected;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -122,6 +123,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>>
{
 		DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
 			.groupBy(0)
 			.reduce(new CountTriangles<K>())
+				.setCombineHint(CombineHint.HASH)
 				.name("Count triangles");
 
 		// u, deg(u)

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ef74ea/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index b88badb..60e99bd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
@@ -169,6 +170,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>>
{
 				.name("Initial scores")
 			.groupBy(0)
 			.reduce(new SumScores<K>())
+				.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -185,6 +187,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>>
{
 				.name("Hub")
 			.groupBy(0)
 			.reduce(new SumScore<K>())
+				.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -194,6 +197,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>>
{
 				.setParallelism(parallelism)
 				.name("Square")
 			.reduce(new Sum())
+				.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -207,6 +211,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>>
{
 				.name("Authority")
 			.groupBy(0)
 			.reduce(new SumScore<K>())
+				.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -216,6 +221,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>>
{
 				.setParallelism(parallelism)
 				.name("Square")
 			.reduce(new Sum())
+				.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 


Mime
View raw message