flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [8/9] flink git commit: [FLINK-1514] [gelly] Simplified GatherUdf; added forwarded fields annotations
Date Sun, 26 Apr 2015 12:01:11 GMT
[FLINK-1514] [gelly] Simplified GatherUdf; added forwarded fields annotations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40f5f3a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40f5f3a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40f5f3a0

Branch: refs/heads/master
Commit: 40f5f3a0160df708ecc78356853138e640b59da5
Parents: 66d72ac
Author: vasia <vasia@apache.org>
Authored: Thu Apr 23 21:56:56 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Sat Apr 25 20:00:59 2015 +0200

----------------------------------------------------------------------
 .../graph/gsa/GatherSumApplyIteration.java      | 36 +++++++++++++-------
 1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40f5f3a0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 992f840..7fcd427 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -19,11 +19,14 @@
 package org.apache.flink.graph.gsa;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.JoinOperator;
@@ -119,11 +122,9 @@ public class GatherSumApplyIteration<K extends Comparable<K>
& Serializable,
 				vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
 
 		// Prepare the neighbors
-		DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors = iteration
-				.getWorkset()
-				.join(edgeDataSet)
-				.where(0)
-				.equalTo(0);
+		DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors = iteration
+				.getWorkset().join(edgeDataSet)
+				.where(0).equalTo(0).with(new ProjectKeyWithNeighbor<K, VV, EV>());
 
 		// Gather, sum and apply
 		DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
@@ -170,8 +171,9 @@ public class GatherSumApplyIteration<K extends Comparable<K>
& Serializable,
 	// --------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("serial")
+	@ForwardedFields("f0")
 	private static final class GatherUdf<K extends Comparable<K> & Serializable,
VV extends Serializable,
-			EV extends Serializable, M> extends RichMapFunction<Tuple2<Vertex<K, VV>,
Edge<K, EV>>,
+			EV extends Serializable, M> extends RichMapFunction<Tuple2<K, Neighbor<VV,
EV>>,
 			Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
 
 		private final GatherFunction<VV, EV, M> gatherFunction;
@@ -183,13 +185,9 @@ public class GatherSumApplyIteration<K extends Comparable<K>
& Serializable,
 		}
 
 		@Override
-		public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> neighborTuple)
throws Exception {
-			Neighbor<VV, EV> neighbor = new Neighbor<VV, EV>(neighborTuple.f0.getValue(),
-					neighborTuple.f1.getValue());
-
-			K key = neighborTuple.f1.getTarget();
-			M result = this.gatherFunction.gather(neighbor);
-			return new Tuple2<K, M>(key, result);
+		public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
+			M result = this.gatherFunction.gather(neighborTuple.f1);
+			return new Tuple2<K, M>(neighborTuple.f0, result);
 		}
 
 		@Override
@@ -302,4 +300,16 @@ public class GatherSumApplyIteration<K extends Comparable<K>
& Serializable,
 		}
 	}
 
+	@SuppressWarnings("serial")
+	@ForwardedFieldsSecond("f1->f0")
+	private static final class ProjectKeyWithNeighbor<K extends Comparable<K> &
Serializable,
+			VV extends Serializable, EV extends Serializable> implements FlatJoinFunction<
+			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K,
Neighbor<VV, EV>>> out) {
+			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+					edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+		}
+	}
+
 }


Mime
View raw message