flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [18/39] [FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.
Date Sat, 09 Aug 2014 12:39:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a41874c..e8ee0bb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -31,12 +31,13 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.common.functions.GenericFlatMap;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.functions.InvalidTypesException;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -60,64 +61,75 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 	
 	@SuppressWarnings("unchecked")
-	public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(GenericMap<IN, OUT> mapInterface, TypeInformation<IN> inType) {
-		validateInputType(GenericMap.class, mapInterface.getClass(), 0, inType);
+	public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
+		validateInputType(MapFunction.class, mapInterface.getClass(), 0, inType);
 		if(mapInterface instanceof ResultTypeQueryable) {
 			return ((ResultTypeQueryable<OUT>) mapInterface).getProducedType();
 		}
-		return new TypeExtractor().privateCreateTypeInfo(GenericMap.class, mapInterface.getClass(), 1, inType, null);
+		return new TypeExtractor().privateCreateTypeInfo(MapFunction.class, mapInterface.getClass(), 1, inType, null);
 	}
 	
 	@SuppressWarnings("unchecked")
-	public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(GenericFlatMap<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
-		validateInputType(GenericFlatMap.class, flatMapInterface.getClass(), 0, inType);
+	public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
+		validateInputType(FlatMapFunction.class, flatMapInterface.getClass(), 0, inType);
 		if(flatMapInterface instanceof ResultTypeQueryable) {
 			return ((ResultTypeQueryable<OUT>) flatMapInterface).getProducedType();
 		}
-		return new TypeExtractor().privateCreateTypeInfo(GenericFlatMap.class, flatMapInterface.getClass(), 1, inType, null);
+		return new TypeExtractor().privateCreateTypeInfo(FlatMapFunction.class, flatMapInterface.getClass(), 1, inType, null);
 	}
 	
 	@SuppressWarnings("unchecked")
-	public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GenericGroupReduce<IN, OUT> groupReduceInterface,
+	public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface,
 			TypeInformation<IN> inType) {
-		validateInputType(GenericGroupReduce.class, groupReduceInterface.getClass(), 0, inType);
+		validateInputType(GroupReduceFunction.class, groupReduceInterface.getClass(), 0, inType);
 		if(groupReduceInterface instanceof ResultTypeQueryable) {
 			return ((ResultTypeQueryable<OUT>) groupReduceInterface).getProducedType();
 		}
-		return new TypeExtractor().privateCreateTypeInfo(GenericGroupReduce.class, groupReduceInterface.getClass(), 1, inType, null);
+		return new TypeExtractor().privateCreateTypeInfo(GroupReduceFunction.class, groupReduceInterface.getClass(), 1, inType, null);
 	}
 	
 	@SuppressWarnings("unchecked")
-	public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(GenericJoiner<IN1, IN2, OUT> joinInterface,
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		validateInputType(GenericJoiner.class, joinInterface.getClass(), 0, in1Type);
-		validateInputType(GenericJoiner.class, joinInterface.getClass(), 1, in2Type);
+		validateInputType(FlatJoinFunction.class, joinInterface.getClass(), 0, in1Type);
+		validateInputType(FlatJoinFunction.class, joinInterface.getClass(), 1, in2Type);
 		if(joinInterface instanceof ResultTypeQueryable) {
 			return ((ResultTypeQueryable<OUT>) joinInterface).getProducedType();
 		}
-		return new TypeExtractor().privateCreateTypeInfo(GenericJoiner.class, joinInterface.getClass(), 2, in1Type, in2Type);
+		return new TypeExtractor().privateCreateTypeInfo(FlatJoinFunction.class, joinInterface.getClass(), 2, in1Type, in2Type);
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
+			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+		validateInputType(JoinFunction.class, joinInterface.getClass(), 0, in1Type);
+		validateInputType(JoinFunction.class, joinInterface.getClass(), 1, in2Type);
+		if(joinInterface instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable<OUT>) joinInterface).getProducedType();
+		}
+		return new TypeExtractor().privateCreateTypeInfo(JoinFunction.class, joinInterface.getClass(), 2, in1Type, in2Type);
 	}
 	
 	@SuppressWarnings("unchecked")
-	public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(GenericCoGrouper<IN1, IN2, OUT> coGroupInterface,
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		validateInputType(GenericCoGrouper.class, coGroupInterface.getClass(), 0, in1Type);
-		validateInputType(GenericCoGrouper.class, coGroupInterface.getClass(), 1, in2Type);
+		validateInputType(CoGroupFunction.class, coGroupInterface.getClass(), 0, in1Type);
+		validateInputType(CoGroupFunction.class, coGroupInterface.getClass(), 1, in2Type);
 		if(coGroupInterface instanceof ResultTypeQueryable) {
 			return ((ResultTypeQueryable<OUT>) coGroupInterface).getProducedType();
 		}
-		return new TypeExtractor().privateCreateTypeInfo(GenericCoGrouper.class, coGroupInterface.getClass(), 2, in1Type, in2Type);
+		return new TypeExtractor().privateCreateTypeInfo(CoGroupFunction.class, coGroupInterface.getClass(), 2, in1Type, in2Type);
 	}
 	
 	@SuppressWarnings("unchecked")
-	public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(GenericCrosser<IN1, IN2, OUT> crossInterface,
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		validateInputType(GenericCrosser.class, crossInterface.getClass(), 0, in1Type);
-		validateInputType(GenericCrosser.class, crossInterface.getClass(), 1, in2Type);
+		validateInputType(CrossFunction.class, crossInterface.getClass(), 0, in1Type);
+		validateInputType(CrossFunction.class, crossInterface.getClass(), 1, in2Type);
 		if(crossInterface instanceof ResultTypeQueryable) {
 			return ((ResultTypeQueryable<OUT>) crossInterface).getProducedType();
 		}
-		return new TypeExtractor().privateCreateTypeInfo(GenericCrosser.class, crossInterface.getClass(), 2, in1Type, in2Type);
+		return new TypeExtractor().privateCreateTypeInfo(CrossFunction.class, crossInterface.getClass(), 2, in1Type, in2Type);
 	}
 	
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 8e0abcb..c786345 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -235,7 +235,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 		try {
 			for (; i < keyPositions.length; i++) {
 				int keyPos = keyPositions[i];
-				int cmp = comparators[i].compare(first.getField(keyPos), second.getField(keyPos));
+				int cmp = comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos));
 				if (cmp != 0) {
 					return cmp;
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
index 1159512..474b022 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.api.common.operators.base.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
@@ -284,7 +282,7 @@ public class SemanticPropertiesTranslationTest {
 	
 	
 	@ConstantFields("*")
-	public static class WildcardConstantMapper<T> extends MapFunction<T, T> {
+	public static class WildcardConstantMapper<T> extends RichMapFunction<T, T> {
 
 		@Override
 		public T map(T value)  {
@@ -293,7 +291,7 @@ public class SemanticPropertiesTranslationTest {
 	}
 	
 	@ConstantFields("0->0;1->1;2->2")
-	public static class IndividualConstantMapper<X, Y, Z> extends MapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
+	public static class IndividualConstantMapper<X, Y, Z> extends RichMapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
 
 		@Override
 		public Tuple3<X, Y, Z> map(Tuple3<X, Y, Z> value) {
@@ -302,7 +300,7 @@ public class SemanticPropertiesTranslationTest {
 	}
 	
 	@ConstantFields("0")
-	public static class ZeroConstantMapper<T> extends MapFunction<T, T> {
+	public static class ZeroConstantMapper<T> extends RichMapFunction<T, T> {
 
 		@Override
 		public T map(T value)  {
@@ -312,7 +310,7 @@ public class SemanticPropertiesTranslationTest {
 	
 	@ConstantFieldsFirst("1 -> 0")
 	@ConstantFieldsSecond("1 -> 1")
-	public static class ForwardingTupleJoin<A, B, C, D> extends JoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
+	public static class ForwardingTupleJoin<A, B, C, D> extends RichJoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
 
 		@Override
 		public Tuple2<B, D> join(Tuple2<A, B> first, Tuple2<C, D> second) {
@@ -322,7 +320,7 @@ public class SemanticPropertiesTranslationTest {
 	
 	@ConstantFieldsFirst("0 -> 0")
 	@ConstantFieldsSecond("0 -> 1")
-	public static class ForwardingBasicJoin<A, B> extends JoinFunction<A, B, Tuple2<A, B>> {
+	public static class ForwardingBasicJoin<A, B> extends RichJoinFunction<A, B, Tuple2<A, B>> {
 
 		@Override
 		public Tuple2<A, B> join(A first, B second) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index db795d9..155bbd1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -35,9 +35,9 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
@@ -128,8 +128,14 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			
 			assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
 			assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
-			assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
-			
+			if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) {
+				WrappingFunction wf = (WrappingFunction) solutionSetJoin.getUserCodeWrapper().getUserCodeObject();
+				assertEquals(SolutionWorksetJoin.class, wf.getWrappedFunction().getClass());
+			}
+			else {
+				assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
+			}
+
 			assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName());
 			
 			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
@@ -215,21 +221,21 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public static class SolutionWorksetJoin extends JoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+	public static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
 		@Override
 		public Tuple3<Double, Long, String> join(Tuple2<Double, String> first, Tuple3<Double, Long, String> second){
 			return null;
 		}
 	}
 	
-	public static class NextWorksetMapper extends MapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
+	public static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
 		@Override
 		public Tuple2<Double, String> map(Tuple3<Double, Long, String> value) {
 			return null;
 		}
 	}
 	
-	public static class IdentityMapper<T> extends MapFunction<T, T> {
+	public static class IdentityMapper<T> extends RichMapFunction<T, T> {
 
 		@Override
 		public T map(T value) throws Exception {
@@ -237,7 +243,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 		}
 	}
 	
-	public static class SolutionWorksetCoGroup1 extends CoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+	public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
 
 		@Override
 		public void coGroup(Iterator<Tuple2<Double, String>> first, Iterator<Tuple3<Double, Long, String>> second,
@@ -245,7 +251,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 		}
 	}
 	
-	public static class SolutionWorksetCoGroup2 extends CoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
+	public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
 
 		@Override
 		public void coGroup(Iterator<Tuple3<Double, Long, String>> second, Iterator<Tuple2<Double, String>> first,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index 9f6a6d8..8e457ce 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -27,9 +27,7 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -53,7 +51,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 			
-			initialData.reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+			initialData.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
 				public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 					return value1;
 				}
@@ -94,7 +92,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
 			
 			initialData
 				.groupBy(2)
-				.reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+				.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
 					public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 						return value1;
 					}
@@ -141,7 +139,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
 						return value.f1;
 					}
 				})
-				.reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+				.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
 					public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 						return value1;
 					}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index b284052..c6ad73d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -23,16 +23,16 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.CrossFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichCrossFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.InvalidTypesException;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -68,7 +68,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testBasicType() {
 		// use getGroupReduceReturnTypes()
-		GroupReduceFunction<?, ?> function = new GroupReduceFunction<Boolean, Boolean>() {
+		RichGroupReduceFunction<?, ?> function = new RichGroupReduceFunction<Boolean, Boolean>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -107,7 +107,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testWritableType() {
-		MapFunction<?, ?> function = new MapFunction<MyWritable, MyWritable>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<MyWritable, MyWritable>() {
 			private static final long serialVersionUID = 1L;
 			
 			@Override
@@ -127,7 +127,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testTupleWithBasicTypes() throws Exception {
 		// use getMapReturnTypes()
-		MapFunction<?, ?> function = new MapFunction<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>, Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>, Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -192,7 +192,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testTupleWithTuples() {
 		// use getFlatMapReturnTypes()
-		FlatMapFunction<?, ?> function = new FlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>, Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() {
+		RichFlatMapFunction<?, ?> function = new RichFlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>, Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -247,12 +247,12 @@ public class TypeExtractorTest {
 	@Test
 	public void testSubclassOfTuple() {
 		// use getJoinReturnTypes()
-		JoinFunction<?, ?, ?> function = new JoinFunction<CustomTuple, String, CustomTuple>() {
+		RichFlatJoinFunction<?, ?, ?> function = new RichFlatJoinFunction<CustomTuple, String, CustomTuple>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public CustomTuple join(CustomTuple first, String second) throws Exception {
-				return null;
+			public void join(CustomTuple first, String second, Collector<CustomTuple> out) throws Exception {
+				out.collect(null);
 			}			
 		};
 
@@ -295,7 +295,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testCustomType() {
 		// use getCrossReturnTypes()
-		CrossFunction<?, ?, ?> function = new CrossFunction<CustomType, Integer, CustomType>() {
+		RichCrossFunction<?, ?, ?> function = new RichCrossFunction<CustomType, Integer, CustomType>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -342,7 +342,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testTupleWithCustomType() {
 		// use getMapReturnTypes()
-		MapFunction<?, ?> function = new MapFunction<Tuple2<Long, CustomType>, Tuple2<Long, CustomType>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<Long, CustomType>, Tuple2<Long, CustomType>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -412,7 +412,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testTupleOfValues() {
 		// use getMapReturnTypes()
-		MapFunction<?, ?> function = new MapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -451,7 +451,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testGenericsNotInSuperclass() {
 		// use getMapReturnTypes()
-		MapFunction<?, ?> function = new MapFunction<LongKeyValue<String>, LongKeyValue<String>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<LongKeyValue<String>, LongKeyValue<String>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -494,7 +494,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testChainedGenericsNotInSuperclass() {
 		// use TypeExtractor
-		MapFunction<?, ?> function = new MapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -536,7 +536,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testGenericsInDirectSuperclass() {
 		// use TypeExtractor
-		MapFunction<?, ?> function = new MapFunction<ChainedThree, ChainedThree>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<ChainedThree, ChainedThree>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -562,7 +562,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testGenericsNotInSuperclassWithNonGenericClassAtEnd() {
 		// use TypeExtractor
-		MapFunction<?, ?> function = new MapFunction<ChainedFour, ChainedFour>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<ChainedFour, ChainedFour>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -587,7 +587,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testMissingTupleGenericsException() {
-		MapFunction<?, ?> function = new MapFunction<String, Tuple2>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple2>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -607,7 +607,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testTupleSupertype() {
-		MapFunction<?, ?> function = new MapFunction<String, Tuple>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -635,7 +635,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testSameGenericVariable() {
-		MapFunction<?, ?> function = new MapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -667,7 +667,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testNestedTupleGenerics() {
-		MapFunction<?, ?> function = new MapFunction<Nested<String, Integer>, Nested<String, Integer>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<Nested<String, Integer>, Nested<String, Integer>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -706,7 +706,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testNestedTupleGenerics2() {
-		MapFunction<?, ?> function = new MapFunction<Nested2<Boolean>, Nested2<Boolean>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<Nested2<Boolean>, Nested2<Boolean>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -746,7 +746,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testFunctionWithMissingGenerics() {
-		MapFunction function = new MapFunction() {
+		RichMapFunction function = new RichMapFunction() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -776,7 +776,7 @@ public class TypeExtractorTest {
 		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
 	}
 
-	public class IdentityMapper<T> extends MapFunction<T, T> {
+	public class IdentityMapper<T> extends RichMapFunction<T, T> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -807,7 +807,7 @@ public class TypeExtractorTest {
 		}
 	}
 
-	public class IdentityMapper2<T> extends MapFunction<Tuple2<T, String>, T> {
+	public class IdentityMapper2<T> extends RichMapFunction<Tuple2<T, String>, T> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -843,7 +843,7 @@ public class TypeExtractorTest {
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1));
 	}
 
-	public class IdentityMapper3<T, V> extends MapFunction<T, V> {
+	public class IdentityMapper3<T, V> extends RichMapFunction<T, V> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -916,7 +916,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testFunctionWithNoGenericSuperclass() {
-		MapFunction<?, ?> function = new Mapper2();
+		RichMapFunction<?, ?> function = new Mapper2();
 
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"));
 
@@ -924,7 +924,7 @@ public class TypeExtractorTest {
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
 	}
 
-	public class OneAppender<T> extends MapFunction<T, Tuple2<T, Integer>> {
+	public class OneAppender<T> extends RichMapFunction<T, Tuple2<T, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		public Tuple2<T, Integer> map(T value) {
@@ -935,7 +935,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testFunctionDependingPartialOnInput() {
-		MapFunction<?, ?> function = new OneAppender<DoubleValue>() {
+		RichMapFunction<?, ?> function = new OneAppender<DoubleValue>() {
 			private static final long serialVersionUID = 1L;
 		};
 
@@ -955,7 +955,7 @@ public class TypeExtractorTest {
 
 	@Test
 	public void testFunctionDependingPartialOnInput2() {
-		MapFunction<DoubleValue, ?> function = new OneAppender<DoubleValue>();
+		RichMapFunction<DoubleValue, ?> function = new OneAppender<DoubleValue>();
 
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new ValueTypeInfo<DoubleValue>(DoubleValue.class));
 
@@ -971,7 +971,7 @@ public class TypeExtractorTest {
 		Assert.assertEquals(Integer.class , tti.getTypeAt(1).getTypeClass());
 	}
 
-	public class FieldDuplicator<T> extends MapFunction<T, Tuple2<T, T>> {
+	public class FieldDuplicator<T> extends RichMapFunction<T, Tuple2<T, T>> {
 		private static final long serialVersionUID = 1L;
 
 		public Tuple2<T, T> map(T value) {
@@ -981,7 +981,7 @@ public class TypeExtractorTest {
 
 	@Test
 	public void testFunctionInputInOutputMultipleTimes() {
-		MapFunction<Float, ?> function = new FieldDuplicator<Float>();
+		RichMapFunction<Float, ?> function = new FieldDuplicator<Float>();
 
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.FLOAT_TYPE_INFO);
 
@@ -994,7 +994,7 @@ public class TypeExtractorTest {
 
 	@Test
 	public void testFunctionInputInOutputMultipleTimes2() {
-		MapFunction<Tuple2<Float, Float>, ?> function = new FieldDuplicator<Tuple2<Float, Float>>();
+		RichMapFunction<Tuple2<Float, Float>, ?> function = new FieldDuplicator<Tuple2<Float, Float>>();
 
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new TupleTypeInfo<Tuple2<Float, Float>>(
 				BasicTypeInfo.FLOAT_TYPE_INFO, BasicTypeInfo.FLOAT_TYPE_INFO));
@@ -1023,7 +1023,7 @@ public class TypeExtractorTest {
 
 	@Test
 	public void testAbstractAndInterfaceTypesException() {
-		MapFunction<String, ?> function = new MapFunction<String, Testable>() {
+		RichMapFunction<String, ?> function = new RichMapFunction<String, Testable>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1039,7 +1039,7 @@ public class TypeExtractorTest {
 			// good
 		}
 
-		MapFunction<String, ?> function2 = new MapFunction<String, AbstractClass>() {
+		RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClass>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1059,7 +1059,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testValueSupertypeException() {
-		MapFunction<?, ?> function = new MapFunction<StringValue, Value>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<StringValue, Value>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1080,7 +1080,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testBasicArray() {
 		// use getCoGroupReturnTypes()
-		CoGroupFunction<?, ?, ?> function = new CoGroupFunction<String[], String[], String[]>() {
+		RichCoGroupFunction<?, ?, ?> function = new RichCoGroupFunction<String[], String[], String[]>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1107,7 +1107,7 @@ public class TypeExtractorTest {
 
 	@Test
 	public void testBasicArray2() {
-		MapFunction<Boolean[], ?> function = new IdentityMapper<Boolean[]>();
+		RichMapFunction<Boolean[], ?> function = new IdentityMapper<Boolean[]>();
 
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO);
 
@@ -1122,7 +1122,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testCustomArray() {
-		MapFunction<?, ?> function = new MapFunction<CustomArrayObject[], CustomArrayObject[]>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<CustomArrayObject[], CustomArrayObject[]>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1140,7 +1140,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testTupleArray() {
-		MapFunction<?, ?> function = new MapFunction<Tuple2<String, String>[], Tuple2<String, String>[]>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<String, String>[], Tuple2<String, String>[]>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1167,7 +1167,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testCustomArrayWithTypeVariable() {
-		MapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>();
+		RichMapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>();
 
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple1<Boolean>[]"));
 
@@ -1178,7 +1178,7 @@ public class TypeExtractorTest {
 		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(0));
 	}
 	
-	public class GenericArrayClass<T> extends MapFunction<T[], T[]> {
+	public class GenericArrayClass<T> extends RichMapFunction<T[], T[]> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -1207,7 +1207,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testParamertizedCustomObject() {
-		MapFunction<?, ?> function = new MapFunction<MyObject<String>, MyObject<String>>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<MyObject<String>, MyObject<String>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1242,7 +1242,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testInputMismatchExceptions() {
 		
-		MapFunction<?, ?> function = new MapFunction<Tuple2<String, String>, String>() {
+		RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<String, String>, String>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1265,7 +1265,7 @@ public class TypeExtractorTest {
 			// right
 		}
 		
-		MapFunction<?, ?> function2 = new MapFunction<StringValue, String>() {
+		RichMapFunction<?, ?> function2 = new RichMapFunction<StringValue, String>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1281,7 +1281,7 @@ public class TypeExtractorTest {
 			// right
 		}
 		
-		MapFunction<?, ?> function3 = new MapFunction<Tuple1<Integer>[], String>() {
+		RichMapFunction<?, ?> function3 = new RichMapFunction<Tuple1<Integer>[], String>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1297,7 +1297,7 @@ public class TypeExtractorTest {
 			// right
 		}
 		
-		MapFunction<?, ?> function4 = new MapFunction<Writable, String>() {
+		RichMapFunction<?, ?> function4 = new RichMapFunction<Writable, String>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1314,7 +1314,7 @@ public class TypeExtractorTest {
 		}
 	}
 	
-	public static class DummyFlatMapFunction<A,B,C,D> extends FlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> {
+	public static class DummyFlatMapFunction<A,B,C,D> extends RichFlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -1336,7 +1336,7 @@ public class TypeExtractorTest {
 		}
 	}
 
-	public static class MyQueryableMapper<A> extends MapFunction<String, A> implements ResultTypeQueryable<A> {
+	public static class MyQueryableMapper<A> extends RichMapFunction<String, A> implements ResultTypeQueryable<A> {
 		private static final long serialVersionUID = 1L;
 		
 		@SuppressWarnings("unchecked")
@@ -1359,7 +1359,7 @@ public class TypeExtractorTest {
 	
 	@Test
 	public void testTupleWithPrimitiveArray() {
-		MapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>> function = new MapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>>() {
+		RichMapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>> function = new RichMapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
@@ -1382,8 +1382,8 @@ public class TypeExtractorTest {
 	}
 	
 	@Test
-	public void testInterface() {
-		GenericMap<String, Boolean> mapInterface = new GenericMap<String, Boolean>() {
+	public void testFunction() {
+		RichMapFunction<String, Boolean> mapInterface = new RichMapFunction<String, Boolean>() {
 			
 			@Override
 			public void setRuntimeContext(RuntimeContext t) {
@@ -1392,7 +1392,6 @@ public class TypeExtractorTest {
 			
 			@Override
 			public void open(Configuration parameters) throws Exception {
-				
 			}
 			
 			@Override
@@ -1414,4 +1413,17 @@ public class TypeExtractorTest {
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
 		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
 	}
+
+	@Test
+	public void testInterface() {
+		MapFunction<String, Boolean> mapInterface = new MapFunction<String, Boolean>() {
+			@Override
+			public Boolean map(String record) throws Exception {
+				return null;
+			}
+		};
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
+		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8-tests/pom.xml b/flink-java8-tests/pom.xml
new file mode 100644
index 0000000..2587776
--- /dev/null
+++ b/flink-java8-tests/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.6-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-java8-tests</artifactId>
+	<name>flink-java8-tests</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.7</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+            <plugin>
+                <!-- just define the Java version to be used for compiling and plugins -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version><!--$NO-MVN-MAN-VER$-->
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                    <!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
+                </configuration>
+            </plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<systemPropertyVariables>
+						<log.level>WARN</log.level>
+					</systemPropertyVariables>
+					<forkMode>once</forkMode>
+					<argLine>-Xmx1024m</argLine>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<systemPropertyVariables>
+						<log.level>WARN</log.level>
+					</systemPropertyVariables>
+					<forkMode>always</forkMode>
+					<threadCount>1</threadCount>
+					<perCoreThreadCount>false</perCoreThreadCount>
+				</configuration>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											org.apache.maven.plugins
+										</groupId>
+										<artifactId>
+											maven-assembly-plugin
+										</artifactId>
+										<versionRange>
+											[2.4,)
+										</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
new file mode 100644
index 0000000..c417249
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class CoGroupITCase implements Serializable {
+
+	@Test
+	public void testCoGroupLambda() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Integer, String>> left = env.fromElements(
+					new Tuple2<Integer, String>(1, "hello"),
+					new Tuple2<Integer, String>(2, "what's"),
+					new Tuple2<Integer, String>(2, "up")
+			);
+			DataSet<Tuple2<Integer, String>> right = env.fromElements(
+					new Tuple2<Integer, String>(1, "not"),
+					new Tuple2<Integer, String>(1, "much"),
+					new Tuple2<Integer, String>(2, "really")
+			);
+			DataSet<Tuple2<Integer,String>> joined = left.coGroup(right).where(0).equalTo(0)
+					.with((values1, values2, out) -> {
+						int sum = 0;
+						String conc = "";
+						while (values1.hasNext()) {
+							sum += values1.next().f0;
+							conc += values1.next().f1;
+						}
+						while (values2.hasNext()) {
+							sum += values2.next().f0;
+							conc += values2.next().f1;
+						}
+					});
+			env.execute();
+
+
+		} catch (UnsupportedLambdaExpressionException e) {
+			// Success
+			return;
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
new file mode 100644
index 0000000..f8d217e
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class CrossITCase implements Serializable {
+
+	@Test
+	public void testCrossLambda() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Integer, String>> left = env.fromElements(
+					new Tuple2<Integer, String>(1, "hello"),
+					new Tuple2<Integer, String>(2, "what's"),
+					new Tuple2<Integer, String>(2, "up")
+			);
+			DataSet<Tuple2<Integer, String>> right = env.fromElements(
+					new Tuple2<Integer, String>(1, "not"),
+					new Tuple2<Integer, String>(1, "much"),
+					new Tuple2<Integer, String>(2, "really")
+			);
+			DataSet<Tuple2<Integer,String>> joined = left.cross(right)
+					.with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1));
+
+		} catch (UnsupportedLambdaExpressionException e) {
+			// Success
+			return;
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
new file mode 100644
index 0000000..c775425
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class FilterITCase extends JavaProgramTestBase {
+
+
+	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>();
+		data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
+		data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
+		data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
+		data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?"));
+		data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
+		data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker"));
+		data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
+		data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
+		data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
+		data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
+		data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
+		data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
+		data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
+		data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
+		data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
+		data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
+		data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
+		data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
+		data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
+		data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
+		data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	private static int NUM_PROGRAMS = 1;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+
+	public FilterITCase(Configuration config) {
+		super(config);
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = FilterProgs.runProgram(curProgId, resultPath);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+
+		return toParameterList(tConfigs);
+	}
+
+	private static class FilterProgs {
+
+		public static String runProgram(int progId, String resultPath) throws Exception {
+
+			switch(progId) {
+				case 1: {
+					/*
+					 * Test lambda filter
+					 * Functionality identical to org.apache.flink.test.javaApiOperators.FilterITCase test 3
+					 */
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+					DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+							filter(value -> value.f2.contains("world"));
+					filterDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "3,2,Hello world\n" +
+							"4,3,Hello world, how are you?\n";
+				}
+				default:
+					throw new IllegalArgumentException("Invalid program id");
+			}
+
+		}
+
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
new file mode 100644
index 0000000..043b4e8
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class FlatJoinITCase implements Serializable {
+
+	@Test
+	public void testFlatJoinLambda() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Integer, String>> left = env.fromElements(
+					new Tuple2<Integer, String>(1, "hello"),
+					new Tuple2<Integer, String>(2, "what's"),
+					new Tuple2<Integer, String>(2, "up")
+			);
+			DataSet<Tuple2<Integer, String>> right = env.fromElements(
+					new Tuple2<Integer, String>(1, "not"),
+					new Tuple2<Integer, String>(1, "much"),
+					new Tuple2<Integer, String>(2, "really")
+			);
+			DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+					.with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
+		} catch (UnsupportedLambdaExpressionException e) {
+			// Success
+			return;
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
new file mode 100644
index 0000000..55f507c
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class FlatMapITCase implements Serializable {
+
+	@Test
+	public void testFlatMapLambda() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+			DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b")));
+			env.execute();
+		} catch (UnsupportedLambdaExpressionException e) {
+			// Success
+			return;
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
new file mode 100644
index 0000000..494aff6
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class GroupReduceITCase implements Serializable {
+
+	@Test
+	public void testAllGroupReduceLambda() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+			DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> {
+				String conc = "";
+				while (values.hasNext()) {
+					String s = values.next();
+					conc = conc.concat(s);
+				}
+				out.collect(conc);
+			});
+			env.execute();
+		} catch (UnsupportedLambdaExpressionException e) {
+			// Success
+			return;
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testGroupReduceLambda() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Integer,String>> stringDs = env.fromElements(
+					new Tuple2<Integer,String>(1, "aa"),
+					new Tuple2<Integer,String>(2, "ab"),
+					new Tuple2<Integer,String>(1, "ac"),
+					new Tuple2<Integer,String>(2, "ad")
+			);
+			DataSet<String> concatDs = stringDs
+					.groupBy(0)
+					.reduceGroup((values, out) -> {
+						String conc = "";
+						while (values.hasNext()) {
+							String s = values.next().f1;
+							conc = conc.concat(s);
+						}
+						out.collect(conc);
+					});
+			env.execute();
+		} catch (UnsupportedLambdaExpressionException e) {
+			// Success
+			return;
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
new file mode 100644
index 0000000..3f4f696
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class JoinITCase implements Serializable {
+
+	@Test
+	public void testJoinLambda() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Integer, String>> left = env.fromElements(
+				new Tuple2<Integer, String>(1, "hello"),
+				new Tuple2<Integer, String>(2, "what's"),
+				new Tuple2<Integer, String>(2, "up")
+			);
+			DataSet<Tuple2<Integer, String>> right = env.fromElements(
+					new Tuple2<Integer, String>(1, "not"),
+					new Tuple2<Integer, String>(1, "much"),
+					new Tuple2<Integer, String>(2, "really")
+			);
+			DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0)
+					.with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + t.f1));
+
+		} catch (UnsupportedLambdaExpressionException e) {
+			// Success
+			return;
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
new file mode 100644
index 0000000..3af360b
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class MapITCase implements Serializable{
+
+	@Test
+	public void TestMapLambda () {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad");
+			DataSet<String> mappedDs = stringDs.map (s -> s.replace("a", "b"));
+			env.execute();
+		}
+		catch (UnsupportedLambdaExpressionException e) {
+			// Success
+			return;
+		}
+		catch (Exception e) {
+			Assert.fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
new file mode 100644
index 0000000..ab27fe4
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class ReduceITCase extends JavaProgramTestBase {
+
+	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(1,1l,0,"Hallo",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,2l,1,"Hallo Welt",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,5l,4,"ABC",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(3,6l,5,"BCD",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,7l,6,"CDE",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,8l,7,"DEF",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,9l,8,"EFG",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(4,10l,9,"FGH",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,11l,10,"GHI",1l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,12l,11,"HIJ",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,13l,12,"IJK",3l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,14l,13,"JKL",2l));
+		data.add(new Tuple5<Integer, Long,  Integer, String, Long>(5,15l,14,"KLM",2l));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> type = new
+				TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	private static int NUM_PROGRAMS = 1;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+
+	public ReduceITCase(Configuration config) {
+		super(config);
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = ReduceProgs.runProgram(curProgId, resultPath);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+
+		return toParameterList(tConfigs);
+	}
+
+	private static class ReduceProgs {
+
+		public static String runProgram(int progId, String resultPath) throws Exception {
+
+			switch(progId) {
+				case 1: {
+					/*
+					 * Test reduce with lambda
+					 * Functionality identical to org.apache.flink.test.javaApiOperators.ReduceITCase test 2
+					 */
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+					DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+							.groupBy(4, 0)
+							.reduce((in1, in2) -> {
+								Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+								out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+								return out;
+							});
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,1,0,Hallo,1\n" +
+							"2,3,2,Hallo Welt wie,1\n" +
+							"2,2,1,Hallo Welt,2\n" +
+							"3,9,0,P-),2\n" +
+							"3,6,5,BCD,3\n" +
+							"4,17,0,P-),1\n" +
+							"4,17,0,P-),2\n" +
+							"5,11,10,GHI,1\n" +
+							"5,29,0,P-),2\n" +
+							"5,25,0,P-),3\n";
+				}
+				default:
+					throw new IllegalArgumentException("Invalid program id");
+			}
+
+		}
+
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 636c492..34cd232 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -54,7 +54,7 @@ import java.io.IOException;
 /**
  * The base class for all tasks able to participate in an iteration.
  */
-public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT>
+public abstract class AbstractIterativePactTask<S extends RichFunction, OT> extends RegularPactTask<S, OT>
 		implements Terminable
 {
 	private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index d7f3b50..7a77cff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -75,7 +75,7 @@ import org.apache.flink.util.MutableObjectIterator;
  *        The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the
  *        same as {@code X}
  */
-public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
+public class IterationHeadPactTask<X, Y, S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> {
 
 	private static final Log log = LogFactory.getLog(IterationHeadPactTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
index 25a6149..2a8325c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.runtime.io.network.api.BufferWriter;
 import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -41,7 +41,7 @@ import org.apache.flink.util.Collector;
  * a {@link BlockingBackChannel} for the workset -XOR- a {@link MutableHashTable} for the solution set. In this case
  * this task must be scheduled on the same instance as the head.
  */
-public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
+public class IterationIntermediatePactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> {
 
 	private static final Log log = LogFactory.getLog(IterationIntermediatePactTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index 570630f..942e2f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.task;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
@@ -38,7 +38,7 @@ import org.apache.flink.util.Collector;
  * <p/>
  * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
  */
-public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT>
+public class IterationTailPactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT>
 		implements PactTaskContext<S, OT> {
 
 	private static final Log log = LogFactory.getLog(IterationTailPactTask.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
index fe70171..d7d63af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
 
 	private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
 	
@@ -110,7 +110,7 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
 	@Override
 	public void run() throws Exception {
 
-		final GenericJoiner<IT1, IT2, OT> matchStub = this.taskContext.getStub();
+		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		
 		if (buildSideIndex == 0) {


Mime
View raw message