flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/2] flink git commit: [FLINK-3681] [cep, typeextractor] Generalize TypeExtractor to support more lambdas
Date Wed, 30 Mar 2016 17:20:27 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 ce448cdbd -> dae29b425


[FLINK-3681] [cep, typeextractor] Generalize TypeExtractor to support more lambdas

The TypeExtractor.getUnaryOperatorReturnType and TypeExtractor.getBinaryOperatorReturnType
methods have been extended to support positional arguments for the input types. This allows
to support parameterized types as Java 8 lambda arguments where the input type is not specified
by the first type argument (e.g. Map<String, T>). This also solves the problem that
the CEP
library did not support Java 8 lambdas as select functions.

This closes #1840.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a96e1a69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a96e1a69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a96e1a69

Branch: refs/heads/release-1.0
Commit: a96e1a6939e60717aae4d5d1fe61c5a6356ae5b1
Parents: ce448cd
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Mar 30 14:55:27 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Mar 30 19:19:48 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 197 ++++++++++++++++---
 flink-java8/pom.xml                             |   7 +
 .../org/apache/flink/cep/CEPLambdaTest.java     |  93 +++++++++
 .../org/apache/flink/cep/PatternStream.java     |   8 +-
 4 files changed, 275 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a96e1a69/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index fdebffd..151f359 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -60,7 +60,6 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
 
 import org.apache.hadoop.io.Writable;
 
@@ -276,13 +275,70 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 	//  Generic extraction methods
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * Returns the unary operator's return type.
+	 *
+	 * @param function Function to extract the return type from
+	 * @param baseClass Base class of the function
+	 * @param hasIterable True if the first function parameter is an iterable, otherwise false
+	 * @param hasCollector True if the function has an additional collector parameter, otherwise
false
+	 * @param inType Type of the input elements (In case of an iterable, it is the element type)
+	 * @param functionName Function name
+	 * @param allowMissing Can the type information be missing
+	 * @param <IN> Input type
+	 * @param <OUT> Output type
+	 * @return TypeInformation of the return type of the function
+	 */
 	@SuppressWarnings("unchecked")
 	@PublicEvolving
-	public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function
function, Class<?> baseClass, 
-			boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType,
-			String functionName, boolean allowMissing)
-	{
+	public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
+		Function function,
+		Class<?> baseClass,
+		boolean hasIterable,
+		boolean hasCollector,
+		TypeInformation<IN> inType,
+		String functionName,
+		boolean allowMissing) {
+
+		return getUnaryOperatorReturnType(
+			function,
+			baseClass,
+			hasIterable ? 0 : -1,
+			hasCollector ? 0 : -1,
+			inType,
+			functionName,
+			allowMissing);
+	}
+
+	/**
+	 * Returns the unary operator's return type.
+	 *
+	 * @param function Function to extract the return type from
+	 * @param baseClass Base class of the function
+	 * @param inputTypeArgumentIndex Index of the type argument of function's first parameter
+	 *                               specifying the input type if it is wrapped (Iterable, Map,
+	 *                               etc.). Otherwise -1.
+	 * @param outputTypeArgumentIndex Index of the type argument of functions second parameter
+	 *                                specifying the output type if it is wrapped in a Collector.
+	 *                                Otherwise -1.
+	 * @param inType Type of the input elements (In case of an iterable, it is the element type)
+	 * @param functionName Function name
+	 * @param allowMissing Can the type information be missing
+	 * @param <IN> Input type
+	 * @param <OUT> Output type
+	 * @return TypeInformation of the return type of the function
+	 */
+	@SuppressWarnings("unchecked")
+	@PublicEvolving
+	public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
+		Function function,
+		Class<?> baseClass,
+		int inputTypeArgumentIndex,
+		int outputTypeArgumentIndex,
+		TypeInformation<IN> inType,
+		String functionName,
+		boolean allowMissing) {
 		try {
 			final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
 			if (m != null) {
@@ -291,12 +347,15 @@ public class TypeExtractor {
 				
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g.
when using local variables inside lambda function
 				final int paramLen = m.getGenericParameterTypes().length - 1;
-				final Type input = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
-				validateInputType((hasIterable)?removeGenericWrapper(input) : input, inType);
+				final Type input = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen
- 1] : m.getGenericParameterTypes()[paramLen];
+				validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex)
: input, inType);
 				if(function instanceof ResultTypeQueryable) {
 					return ((ResultTypeQueryable<OUT>) function).getProducedType();
 				}
-				return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen])
: m.getGenericReturnType(), inType, null);
+				return new TypeExtractor().privateCreateTypeInfo(
+					(outputTypeArgumentIndex >= 0) ? extractTypeArgument(m.getGenericParameterTypes()[paramLen],
outputTypeArgumentIndex) : m.getGenericReturnType(),
+					inType,
+					null);
 			}
 			else {
 				validateInputType(baseClass, function.getClass(), 0, inType);
@@ -314,13 +373,78 @@ public class TypeExtractor {
 			}
 		}
 	}
-	
+
+	/**
+	 * Returns the binary operator's return type.
+	 *
+	 * @param function Function to extract the return type from
+	 * @param baseClass Base class of the function
+	 * @param hasIterables True if the first function parameter is an iterable, otherwise false
+	 * @param hasCollector True if the function has an additional collector parameter, otherwise
false
+	 * @param in1Type Type of the left side input elements (In case of an iterable, it is the
element type)
+	 * @param in2Type Type of the right side input elements (In case of an iterable, it is the
element type)
+	 * @param functionName Function name
+	 * @param allowMissing Can the type information be missing
+	 * @param <IN1> Left side input type
+	 * @param <IN2> Right side input type
+	 * @param <OUT> Output type
+	 * @return TypeInformation of the return type of the function
+	 */
 	@SuppressWarnings("unchecked")
 	@PublicEvolving
-	public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function
function, Class<?> baseClass,
-			boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2>
in2Type,
-			String functionName, boolean allowMissing)
-	{
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
+		Function function,
+		Class<?> baseClass,
+		boolean hasIterables,
+		boolean hasCollector,
+		TypeInformation<IN1> in1Type,
+		TypeInformation<IN2> in2Type,
+		String functionName,
+		boolean allowMissing) {
+
+		return getBinaryOperatorReturnType(
+			function,
+			baseClass,
+			hasIterables ? 0 : -1,
+			hasCollector ? 0 : -1,
+			in1Type,
+			in2Type,
+			functionName,
+			allowMissing
+		);
+	}
+
+	/**
+	 * Returns the binary operator's return type.
+	 *
+	 * @param function Function to extract the return type from
+	 * @param baseClass Base class of the function
+	 * @param inputTypeArgumentIndex Index of the type argument of function's first parameter
+	 *                               specifying the input type if it is wrapped (Iterable, Map,
+	 *                               etc.). Otherwise -1.
+	 * @param outputTypeArgumentIndex Index of the type argument of functions second parameter
+	 *                                specifying the output type if it is wrapped in a Collector.
+	 *                                Otherwise -1.
+	 * @param in1Type Type of the left side input elements (In case of an iterable, it is the
element type)
+	 * @param in2Type Type of the right side input elements (In case of an iterable, it is the
element type)
+	 * @param functionName Function name
+	 * @param allowMissing Can the type information be missing
+	 * @param <IN1> Left side input type
+	 * @param <IN2> Right side input type
+	 * @param <OUT> Output type
+	 * @return TypeInformation of the return type of the function
+	 */
+	@SuppressWarnings("unchecked")
+	@PublicEvolving
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
+		Function function,
+		Class<?> baseClass,
+		int inputTypeArgumentIndex,
+		int outputTypeArgumentIndex,
+		TypeInformation<IN1> in1Type,
+		TypeInformation<IN2> in2Type,
+		String functionName,
+		boolean allowMissing) {
 		try {
 			final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
 			if (m != null) {
@@ -329,14 +453,17 @@ public class TypeExtractor {
 				
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g.
when using local variables inside lambda function
 				final int paramLen = m.getGenericParameterTypes().length - 1;
-				final Type input1 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen
- 1];
-				final Type input2 = (hasCollector)? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
-				validateInputType((hasIterables)? removeGenericWrapper(input1) : input1, in1Type);
-				validateInputType((hasIterables)? removeGenericWrapper(input2) : input2, in2Type);
+				final Type input1 = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen
- 2] : m.getGenericParameterTypes()[paramLen - 1];
+				final Type input2 = (outputTypeArgumentIndex >= 0 ) ? m.getGenericParameterTypes()[paramLen
- 1] : m.getGenericParameterTypes()[paramLen];
+				validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input1, inputTypeArgumentIndex)
: input1, in1Type);
+				validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input2, inputTypeArgumentIndex)
: input2, in2Type);
 				if(function instanceof ResultTypeQueryable) {
 					return ((ResultTypeQueryable<OUT>) function).getProducedType();
 				}
-				return new TypeExtractor().privateCreateTypeInfo((hasCollector)? removeGenericWrapper(m.getGenericParameterTypes()[paramLen])
: m.getGenericReturnType(), in1Type, in2Type);
+				return new TypeExtractor().privateCreateTypeInfo(
+					(outputTypeArgumentIndex >= 0) ? extractTypeArgument(m.getGenericParameterTypes()[paramLen],
outputTypeArgumentIndex) : m.getGenericReturnType(),
+					in1Type,
+					in2Type);
 			}
 			else {
 				validateInputType(baseClass, function.getClass(), 0, in1Type);
@@ -1141,14 +1268,32 @@ public class TypeExtractor {
 		}
 		return fieldCount;
 	}
-	
-	private static Type removeGenericWrapper(Type t) {
-		if(t instanceof ParameterizedType 	&& 
-				(Collector.class.isAssignableFrom(typeToClass(t))
-						|| Iterable.class.isAssignableFrom(typeToClass(t)))) {
-			return ((ParameterizedType) t).getActualTypeArguments()[0];
+
+	/**
+	 * * This method extracts the n-th type argument from the given type. An InvalidTypesException
+	 * is thrown if the type does not have any type arguments or if the index exceeds the number
+	 * of type arguments.
+	 *
+	 * @param t Type to extract the type arguments from
+	 * @param index Index of the type argument to extract
+	 * @return The extracted type argument
+	 * @throws InvalidTypesException if the given type does not have any type arguments or if
the
+	 * index exceeds the number of type arguments.
+	 */
+	private static Type extractTypeArgument(Type t, int index) throws InvalidTypesException
{
+		if(t instanceof ParameterizedType) {
+			Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments();
+
+			if (index < 0 || index >= actualTypeArguments.length) {
+				throw new InvalidTypesException("Cannot extract the type argument with index " +
+					index + " because the type has only " + actualTypeArguments.length +
+					" type arguments.");
+			} else {
+				return actualTypeArguments[index];
+			}
+		} else {
+			throw new InvalidTypesException("The given type " + t + " is not a parameterized type.");
 		}
-		return t;
 	}
 	
 	private static void validateLambdaGenericParameters(Method m) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a96e1a69/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index c949aae..ec69887 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -69,6 +69,13 @@ under the License.
 			<artifactId>flink-examples-batch_2.10</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-cep_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/a96e1a69/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
new file mode 100644
index 0000000..2e6fcd9
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class CEPLambdaTest extends TestLogger {
+	public static class EventA {}
+
+	public static class EventB {}
+
+	/**
+	 * Tests that a Java8 lambda can be passed as a CEP select function
+	 */
+	@Test
+	public void testLambdaSelectFunction() {
+		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
+		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
+
+		TypeInformation<Map<String, EventA>> inputTpeInformation = (TypeInformation<Map<String,
EventA>>) (TypeInformation<?>) TypeInformation.of(Map.class);
+
+		DataStream<Map<String, EventA>> inputStream = new DataStream<>(
+			StreamExecutionEnvironment.getExecutionEnvironment(),
+			new SourceTransformation<>(
+				"source",
+				null,
+				inputTpeInformation,
+				1));
+
+
+		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, eventTypeInformation);
+
+		DataStream<EventB> result = patternStream.select(
+			map -> new EventB()
+		);
+
+		assertEquals(outputTypeInformation, result.getType());
+	}
+
+	/**
+	 * Tests that a Java8 labmda can be passed as a CEP flat select function
+	 */
+	@Test
+	public void testLambdaFlatSelectFunction() {
+		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
+		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
+
+		TypeInformation<Map<String, EventA>> inputTpeInformation = (TypeInformation<Map<String,
EventA>>) (TypeInformation<?>) TypeInformation.of(Map.class);
+
+		DataStream<Map<String, EventA>> inputStream = new DataStream<>(
+			StreamExecutionEnvironment.getExecutionEnvironment(),
+			new SourceTransformation<>(
+				"source",
+				null,
+				inputTpeInformation,
+				1));
+
+		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, eventTypeInformation);
+
+		DataStream<EventB> result = patternStream.flatSelect(
+			(map, collector) -> collector.collect(new EventB())
+		);
+
+		assertEquals(outputTypeInformation, result.getType());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a96e1a69/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 63ed3b4..88505a4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -64,8 +64,8 @@ public class PatternStream<T> {
 		TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
-			false,
-			false,
+			1,
+			-1,
 			inputType,
 			null,
 			false);
@@ -93,8 +93,8 @@ public class PatternStream<T> {
 		TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
-			false,
-			false,
+			1,
+			0,
 			inputType,
 			null,
 			false);


Mime
View raw message