flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/2] flink git commit: [FLINK-5097][gelly] Add missing input type information to TypeExtractor in MapVertices, mapEdges, fromDataSet, and groupReduceOnEdges
Date Fri, 16 Dec 2016 10:24:39 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4666e65ef -> 3d41f2b82


[FLINK-5097][gelly] Add missing input type information to TypeExtractor
in MapVertices, mapEdges, fromDataSet, and groupReduceOnEdges

This closes #2842


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88e458b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88e458b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88e458b4

Branch: refs/heads/master
Commit: 88e458b47c27d097c18674f1d5b9630349aeb129
Parents: 4666e65
Author: vasia <vasia@apache.org>
Authored: Sat Nov 19 15:35:43 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Fri Dec 16 10:36:49 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java |  47 +++-----
 .../test/operations/TypeExtractorTest.java      | 119 +++++++++++++++++++
 2 files changed, 133 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88e458b4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index c6843e4..0ee03c2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -214,7 +214,7 @@ public class Graph<K, VV, EV> {
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 
 		TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
-				MapFunction.class, vertexValueInitializer.getClass(), 1, null, null);
+				MapFunction.class, vertexValueInitializer.getClass(), 1, keyType, null);
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K,
VV>>) new TupleTypeInfo(
@@ -529,7 +529,7 @@ public class Graph<K, VV, EV> {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
 
-		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(),
1, null, null);
+		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(),
1, vertices.getType(), null);
 
 		TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K,
NV>>) new TupleTypeInfo(
 				Vertex.class, keyType, valueType);
@@ -573,7 +573,7 @@ public class Graph<K, VV, EV> {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 
-		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(),
1, null, null);
+		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(),
1, edges.getType(), null);
 
 		TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>)
new TupleTypeInfo(
 				Edge.class, keyType, keyType, valueType);
@@ -1002,7 +1002,7 @@ public class Graph<K, VV, EV> {
 			return vertices.coGroup(edges).where(0).equalTo(0)
 					.with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on out-edges");
 		case ALL:
-			return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+			return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, EV>())
 						.name("Emit edge"))
 					.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
 						.name("GroupReduce on in- and out-edges");
@@ -1039,7 +1039,7 @@ public class Graph<K, VV, EV> {
 						.with(new ApplyCoGroupFunction<>(edgesFunction))
 							.name("GroupReduce on out-edges").returns(typeInfo);
 			case ALL:
-				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, EV>())
 							.name("Emit edge"))
 						.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
 							.name("GroupReduce on in- and out-edges").returns(typeInfo);
@@ -1065,24 +1065,12 @@ public class Graph<K, VV, EV> {
 	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
 			EdgeDirection direction) throws IllegalArgumentException {
 
-		switch (direction) {
-		case IN:
-			return edges.map(new ProjectVertexIdMap<K, EV>(1))
-					.withForwardedFields("f1->f0").name("Vertex ID")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
-						.name("GroupReduce on in-edges");
-		case OUT:
-			return edges.map(new ProjectVertexIdMap<K, EV>(0))
-					.withForwardedFields("f0").name("Vertex ID")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
-						.name("GroupReduce on out-edges");
-		case ALL:
-			return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge")
-				.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
-					.name("GroupReduce on in- and out-edges");
-		default:
-			throw new IllegalArgumentException("Illegal edge direction");
-		}
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
+		TypeInformation<EV> edgeValueType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(2);
+		TypeInformation<T> returnType = TypeExtractor.createTypeInfo(EdgesFunction.class,
edgesFunction.getClass(), 2,
+			keyType, edgeValueType);
+
+		return groupReduceOnEdges(edgesFunction, direction, returnType);
 	}
 
 	/**
@@ -1115,7 +1103,7 @@ public class Graph<K, VV, EV> {
 						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
 							.name("GroupReduce on out-edges").returns(typeInfo);
 			case ALL:
-				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge")
+				return edges.flatMap(new EmitOneEdgePerNode<K, EV>()).name("Emit edge")
 						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
 							.name("GroupReduce on in- and out-edges").returns(typeInfo);
 			default:
@@ -1153,8 +1141,7 @@ public class Graph<K, VV, EV> {
 		}
 	}
 
-	private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<
-		Tuple2<K, Edge<K, EV>>, T>,	ResultTypeQueryable<T> {
+	private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<Tuple2<K,
Edge<K, EV>>, T> {
 
 		private EdgesFunction<K, EV, T> function;
 
@@ -1165,14 +1152,9 @@ public class Graph<K, VV, EV> {
 		public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T>
out) throws Exception {
 			function.iterateEdges(edges, out);
 		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null,
null);
-		}
 	}
 
-	private static final class EmitOneEdgePerNode<K, VV, EV> implements FlatMapFunction<
+	private static final class EmitOneEdgePerNode<K, EV> implements FlatMapFunction<
 		Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>>
out) {
@@ -1219,7 +1201,6 @@ public class Graph<K, VV, EV> {
 				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
 			}
 		}
-
 		@Override
 		public TypeInformation<T> getProducedType() {
 			return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(),
3,

http://git-wip-us.apache.org/repos/asf/flink/blob/88e458b4/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
new file mode 100644
index 0000000..484ef3d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.*;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TypeExtractorTest {
+
+	private Graph<Long, Long, Long> inputGraph;
+	private DataSet<Vertex<Long, Long>> vertices;
+	private DataSet<Edge<Long, Long>> edges;
+	private ExecutionEnvironment env;
+
+	@Before
+	public void setUp() throws Exception {
+		env = ExecutionEnvironment.getExecutionEnvironment();
+		vertices = TestGraphUtils.getLongLongVertexData(env);
+		edges = TestGraphUtils.getLongLongEdgeData(env);
+		inputGraph = Graph.fromDataSet(vertices, edges, env);
+	}
+
+	@Test
+	public void testMapVerticesType() throws Exception {
+
+		// test type extraction in mapVertices
+		DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = inputGraph.mapVertices(new
VertexMapper<Long>()).getVertices();
+		Assert.assertTrue(new TupleTypeInfo(Vertex.class, BasicTypeInfo.LONG_TYPE_INFO,
+			new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+			.equals(outVertices.getType()));
+	}
+
+	@Test
+	public void testMapEdgesType() throws Exception {
+
+		// test type extraction in mapEdges
+		DataSet<Edge<Long, Tuple2<Long, Integer>>> outEdges = inputGraph.mapEdges(new
EdgeMapper<Long>()).getEdges();
+		Assert.assertTrue(new TupleTypeInfo(Edge.class, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+			.equals(outEdges.getType()));
+	}
+
+	@Test
+	public void testFromDataSet() throws Exception {
+		DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = Graph.fromDataSet(edges,
new VertexInitializer<Long>(), env)
+			.getVertices();
+		Assert.assertTrue(new TupleTypeInfo(Vertex.class, BasicTypeInfo.LONG_TYPE_INFO,
+			new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+			.equals(outVertices.getType()));
+	}
+
+	@Test
+	public void testGroupReduceOnEdges() throws Exception {
+		DataSet<Tuple2<Long, Long>> output = inputGraph.groupReduceOnEdges(new EdgesGroupFunction<Long,
Long>(), EdgeDirection.OUT);
+		Assert.assertTrue((new TupleTypeInfo<Tuple2<Long, Long>>(BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO)).equals(output.getType()));
+	}
+
+	public static final class VertexMapper<K> implements MapFunction<Vertex<K, Long>,
Tuple2<K, Integer>> {
+
+		private final Tuple2<K, Integer> outTuple = new Tuple2<>();
+
+		@Override
+		public Tuple2<K, Integer> map(Vertex<K, Long> inputVertex) throws Exception
{
+			return outTuple;
+		}
+	}
+
+	public static final class EdgeMapper<K> implements MapFunction<Edge<K, Long>,
Tuple2<K, Integer>> {
+
+		private final Tuple2<K, Integer> outTuple = new Tuple2<>();
+
+		@Override
+		public Tuple2<K, Integer> map(Edge<K, Long> inputEdge) throws Exception {
+			return outTuple;
+		}
+	}
+
+	public static final class EdgesGroupFunction<K, EV> implements EdgesFunction<K,
EV, Tuple2<K, EV>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<Tuple2<K,
EV>> out) throws Exception {
+			out.collect(new Tuple2<K, EV>());
+		}
+	}
+
+	public static final class VertexInitializer<K> implements MapFunction<K, Tuple2<K,
Integer>> {
+
+		@Override
+		public Tuple2<K, Integer> map(K value) throws Exception {
+			return null;
+		}
+	}
+}


Mime
View raw message