flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [16/50] [abbrv] flink git commit: [FLINK-1201] [gelly] fix ClassCastException and Type errors in mapVertices; fixes #41 and #46
Date Wed, 11 Feb 2015 10:49:18 GMT
[FLINK-1201] [gelly] fix ClassCastException and Type errors in mapVertices; fixes #41 and #46


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b751df28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b751df28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b751df28

Branch: refs/heads/master
Commit: b751df28863af4a6216d967067dbb6f3729b66da
Parents: 3cf734f
Author: vasia <vasilikikalavri@gmail.com>
Authored: Tue Jan 6 20:54:25 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 51 ++++++++++++++------
 1 file changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b751df28/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 03fbf94..1cd5c90 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -106,7 +106,9 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
      * @return a new graph
      */
     public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K,
VV>, NV> mapper) {
-    	DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K,
VV, NV>(mapper));
+    	TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
+    	DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K,
VV, NV>(mapper,
+    			keyType));
         return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context);
     }
     
@@ -115,19 +117,24 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 		<Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, NV>>
{
 	
 		private MapFunction<Vertex<K, VV>, NV> innerMapper;
-		public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper)
{
+		private transient TypeInformation<K> keyType;
+		public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper,
TypeInformation<K> keyType) {
 			this.innerMapper = theMapper;
+			this.keyType = keyType;
 		}
 		
 		public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
 			return new Vertex<K, NV>(value.f0, innerMapper.map(value));
 		}
 	
+		@SuppressWarnings("unchecked")
 		@Override
 		public TypeInformation<Vertex<K, NV>> getProducedType() {
-			return new TupleTypeInfo<Vertex<K, NV>>(
-					((TupleTypeInfo<?>)(TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(),
0, null, null))).getTypeAt(0),
-					TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null));
+			TypeInformation<NV> valueType = TypeExtractor
+					.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null);
+			@SuppressWarnings("rawtypes")
+			TypeInformation<?> returnType = new TupleTypeInfo<Vertex>(Vertex.class, keyType,
valueType);
+			return (TypeInformation<Vertex<K, NV>>) returnType;
 		}
     }
 
@@ -137,7 +144,9 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
      * @return 
      */
     public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K,
EV>, NV> mapper) {
-    	DataSet<Edge<K, NV>> mappedEdges = edges.map(new ApplyMapperToEdgeWithType<K,
EV, NV>(mapper));
+    	TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
+    	DataSet<Edge<K, NV>> mappedEdges = edges.map(new ApplyMapperToEdgeWithType<K,
EV, NV>(mapper,
+    			keyType));
         return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
     }
     
@@ -146,21 +155,25 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 		<Edge<K, EV>, Edge<K, NV>>, ResultTypeQueryable<Edge<K, NV>>
{
 	
 		private MapFunction<Edge<K, EV>, NV> innerMapper;
+		private transient TypeInformation<K> keyType;
 		
-		public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> theMapper) {
+		public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> theMapper, TypeInformation<K>
keyType) {
 			this.innerMapper = theMapper;
+			this.keyType = keyType;
 		}
 		
 		public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
 			return new Edge<K, NV>(value.f0, value.f1, innerMapper.map(value));
 		}
 	
+		@SuppressWarnings("unchecked")
 		@Override
 		public TypeInformation<Edge<K, NV>> getProducedType() {
-			TypeInformation<K> keyType = ((TupleTypeInfo<?>)
-					(TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, null, null))).getTypeAt(0);
-			return new TupleTypeInfo<Edge<K, NV>>(keyType, keyType,
-					TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null));
+			TypeInformation<NV> valueType = TypeExtractor
+					.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null);
+			@SuppressWarnings("rawtypes")
+			TypeInformation<?> returnType = new TupleTypeInfo<Edge>(Edge.class, keyType,
keyType, valueType);
+			return (TypeInformation<Edge<K, NV>>) returnType;
 			}
     }
 
@@ -604,9 +617,10 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable> 
 		Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final MapFunction<K,
VV> mapper, 
 				ExecutionEnvironment context) {
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 		DataSet<Vertex<K, VV>> vertices = 
 				edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>())
-				.distinct().map(new ApplyMapperToVertexValuesWithType<K, VV>(mapper));
+				.distinct().map(new ApplyMapperToVertexValuesWithType<K, VV>(mapper, keyType));
 		return new Graph<K, VV, EV>(vertices, edges, context);
 	}
 	
@@ -615,20 +629,25 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 		<Tuple1<K>, Vertex<K, VV>>, ResultTypeQueryable<Vertex<K, VV>>
{
 
 		private MapFunction<K, VV> innerMapper;
+		private transient TypeInformation<K> keyType;
 
-		public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> theMapper) {
+		public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> theMapper, TypeInformation<K>
keyType) {
 			this.innerMapper = theMapper;
+			this.keyType = keyType;
 		}
 
 		public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
 			return new Vertex<K, VV>(value.f0, innerMapper.map(value.f0));
 		}
 
+		@SuppressWarnings("unchecked")
 		@Override
 		public TypeInformation<Vertex<K, VV>> getProducedType() {
-			return new TupleTypeInfo<Vertex<K, VV>>(
-					TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, null, null),
-					TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null));
+			TypeInformation<VV> valueType = TypeExtractor
+					.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null);
+			@SuppressWarnings("rawtypes")
+			TypeInformation<?> returnType = new TupleTypeInfo<Vertex>(Vertex.class, keyType,
valueType);
+			return (TypeInformation<Vertex<K, VV>>) returnType;
 		}
 	}
 	


Mime
View raw message