flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/6] flink git commit: [FLINK-5461] [gelly] Remove Superflous TypeInformation Declaration
Date Tue, 17 Jan 2017 21:35:44 GMT
Repository: flink
Updated Branches:
  refs/heads/master ece899a9f -> cb2820675


[FLINK-5461] [gelly] Remove Superflous TypeInformation Declaration

FLINK-4624 updated Gelly's Summarization algorithm to use
Either<NullValue, VV> in order to support types for which the
serialization does not support null values. This required the use of
explicit TypeInformation due to TypeExtractor. FLINK-4673 created a
TypeInfoFactory for EitherType so the explicit TypeInformation can be
removed.

This closes #3096


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b408d61f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b408d61f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b408d61f

Branch: refs/heads/master
Commit: b408d61f707806d2a188c55065d1187f4f05099a
Parents: ece899a
Author: Greg Hogan <code@greghogan.com>
Authored: Wed Jan 11 13:43:58 2017 -0500
Committer: Greg Hogan <code@greghogan.com>
Committed: Tue Jan 17 15:37:53 2017 -0500

----------------------------------------------------------------------
 .../flink/graph/library/Summarization.java      | 26 +++-----------------
 1 file changed, 4 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b408d61f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
index 2361247..fed4d89 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
@@ -23,15 +23,10 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -97,31 +92,18 @@ public class Summarization<K, VV, EV>
 
 	@Override
 	public Graph<K, VertexValue<VV>, EdgeValue<EV>> run(Graph<K, VV, EV>
input) throws Exception {
-		// create type infos for correct return type definitions
-		// Note: this use of type hints is only required due to
-		// limitations of the type parser for the Either type
-		// which are being fixed in FLINK-4673
-		TypeInformation<K> keyType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(0);
-		TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1);
-		@SuppressWarnings("unchecked")
-		TupleTypeInfo<Vertex<K, VertexValue<VV>>> vertexType = (TupleTypeInfo<Vertex<K,
VertexValue<VV>>>) new TupleTypeInfo(
-			Vertex.class, keyType, new TupleTypeInfo(VertexValue.class, valueType, BasicTypeInfo.LONG_TYPE_INFO));
-
 		// -------------------------
 		// build super vertices
 		// -------------------------
 
-		// group vertices by value
-		UnsortedGrouping<Vertex<K, VV>> vertexUnsortedGrouping = input.getVertices()
-				.groupBy(1);
-		// reduce vertex group and create vertex group items
-		GroupReduceOperator<Vertex<K, VV>, VertexGroupItem<K, VV>> vertexGroupItems
= vertexUnsortedGrouping
+		// group vertices by value and create vertex group items
+		DataSet<VertexGroupItem<K, VV>> vertexGroupItems = input.getVertices()
+				.groupBy(1)
 				.reduceGroup(new VertexGroupReducer<K, VV>());
 		// create super vertices
 		DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = vertexGroupItems
 				.filter(new VertexGroupItemToSummarizedVertexFilter<K, VV>())
-				.map(new VertexGroupItemToSummarizedVertexMapper<K, VV>())
-				.returns(vertexType);
+				.map(new VertexGroupItemToSummarizedVertexMapper<K, VV>());
 
 		// -------------------------
 		// build super edges


Mime
View raw message