flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [15/22] [FLINK-701] Several cleanups after SAM refactoring. - Lambda detection compiles on earlier java versions - Add lambda detection test. - Fix JavaDocs
Date Fri, 01 Aug 2014 07:29:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
index 2293b5e..c045508 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
@@ -20,22 +20,14 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * The abstract base class for flatMap functions. FlatMap functions take elements and transform them,
- * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
- * and arrays. Operations that produce multiple strictly one result element per input element can also
- * use the {@link RichMapFunction}.
- * <p>
- * The basic syntax for using a FlatMapFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the FlatMapFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link FlatMapFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
@@ -44,16 +36,6 @@ public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
-	 * it into zero, one, or more elements.
-	 * 
-	 * @param value The input value.
-	 * @param out The collector for for emitting result values.
-	 * 
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
 	@Override
 	public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
index eb75f53..801f592 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
@@ -27,26 +27,14 @@ import java.util.Iterator;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * The abstract base class for group reduce functions. Group reduce functions process groups of elements.
- * They may aggregate them to a single value, or produce multiple result values for each group.
- * <p>
- * For a reduce functions that works incrementally by combining always two elements, see 
- * {@link RichReduceFunction}, called via {@link org.apache.flink.api.java.DataSet#reduce(RichReduceFunction)}.
- * <p>
- * The basic syntax for using a grouped GroupReduceFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction());
- * </blockquote></pre>
- * <p>
- * GroupReduceFunctions may be "combinable", in which case they can pre-reduce partial groups in order to
- * reduce the data volume early. See the {@link #combine(Iterator, Collector)} function for details.
- * <p>
- * Like all functions, the GroupReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link GroupReduceFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN> Type of the elements that this function processes.
  * @param <OUT> The type of the elements returned by the user-defined function.
@@ -55,16 +43,6 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
 	
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * Core method of the reduce function. It is called one per group of elements. If the reducer
-	 * is not grouped, than the entire data set is considered one group.
-	 * 
-	 * @param values The iterator returning the group of values to be reduced.
-	 * @param out The collector to emit the returned values.
-	 * 
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
 	@Override
 	public abstract void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
index 7eaf44c..a0c28ee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
@@ -18,10 +18,20 @@
 
 package org.apache.flink.api.java.functions;
 
-
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 
+/**
+ * Rich variant of the {@link JoinFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
 public abstract class RichJoinFunction<IN1,IN2,OUT> extends AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
index 54de7d4..f6f5356 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
@@ -20,22 +20,13 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 
 /**
- * The abstract base class for Map functions. Map functions take elements and transform them,
- * element wise. A Map function always produces a single result element for each input element.
- * Typical applications are parsing elements, converting data types, or projecting out fields.
- * Operations that produce multiple result elements from a single input element can be implemented
- * using the {@link RichFlatMapFunction}.
- * <p>
- * The basic syntax for using a MapFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<Y> result = input.map(new MyMapFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the MapFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link MapFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
@@ -44,16 +35,6 @@ public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction impl
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The core method of the MapFunction. Takes an element from the input data set and transforms
-	 * it into another element.
-	 * 
-	 * @param value The input value.
-	 * @return The value produced by the map function from the input value.
-	 * 
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
 	@Override
 	public abstract OUT map(IN value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
index 35cb392..a63f8dc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
@@ -20,27 +20,13 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 
 /**
- * The abstract base class for Reduce functions. Reduce functions combine groups of elements to
- * a single value, by taking always two elements and combining them into one. Reduce functions
- * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced
- * individually.
- * <p>
- * For a reduce functions that work on an entire group at the same time (such as the 
- * MapReduce/Hadoop-style reduce), see {@link RichGroupReduceFunction}, called via
- * {@link org.apache.flink.api.java.DataSet#reduceGroup(RichGroupReduceFunction)}. In the general case,
- * ReduceFunctions are considered faster, because they allow the system to use hash-based
- * execution strategies.
- * <p>
- * The basic syntax for using a grouped ReduceFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ * Rich variant of the {@link ReduceFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <T> Type of the elements that this function processes.
  */
@@ -48,16 +34,5 @@ public abstract class RichReduceFunction<T> extends AbstractRichFunction impleme
 	
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The core method of the ReduceFunction, combining two values into one value of the same type.
-	 * The reduce function is consecutively applied to all values of a group until only a single value remains.
-	 *
-	 * @param value1 The first value to combine.
-	 * @param value2 The second value to combine.
-	 * @return The combined value of both input values.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
 	public abstract T reduce(T value1, T value2) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 89c3334..744893b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -515,7 +515,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					if (function == null) {
 						throw new NullPointerException("CoGroup function must not be null.");
 					}
-					if (FunctionUtils.isSerializedLambdaFunction(function)) {
+					if (FunctionUtils.isLambdaFunction(function)) {
 						throw new UnsupportedLambdaExpressionException();
 					}
 					TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index d1e99d6..a24a093 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -134,7 +134,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			if (function == null) {
 				throw new NullPointerException("Cross function must not be null.");
 			}
-			if (FunctionUtils.isSerializedLambdaFunction(function)) {
+			if (FunctionUtils.isLambdaFunction(function)) {
 				throw new UnsupportedLambdaExpressionException();
 			}
 			TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 591551f..7646fa0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.operators;
 import java.util.Iterator;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
@@ -29,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -80,7 +80,6 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
 		
 		final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>();
-		final FlatCombineFunction<T> combineFunction = new DistinctCombiner<T>();
 
 		String name = function.getClass().getName();
 		
@@ -104,7 +103,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 
 
 			PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct(
-							selectorKeys, function, combineFunction, getInputType(), getResultType(), name, input, true);
+							selectorKeys, function, getInputType(), getResultType(), name, input);
 			
 			po.setDegreeOfParallelism(this.getParallelism());
 			
@@ -118,9 +117,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	// --------------------------------------------------------------------------------------------
 	
 	private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
-			Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function, FlatCombineFunction<IN> combineFunction,
-			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input,
-			boolean combinable)
+			Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function,
+			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
 	{
 		@SuppressWarnings("unchecked")
 		final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
@@ -131,7 +129,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 
 
 		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
-				new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable);
+				new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, true);
 		
 		MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
 
@@ -144,26 +142,14 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		return reducer;
 	}
 	
+	@Combinable
 	public static final class DistinctFunction<T> extends RichGroupReduceFunction<T, T> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterator<T> values, Collector<T> out)
-				throws Exception {
+		public void reduce(Iterator<T> values, Collector<T> out) {
 			out.collect(values.next());
 		}
 	}
-
-	public static final class DistinctCombiner<T> implements FlatCombineFunction<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void combine(Iterator<T> values, Collector<T> out)
-				throws Exception {
-			out.collect(values.next());
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 7ab0b11..e1424ad 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -51,9 +51,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 
 	private final Grouping<IN> grouper;
 
-	// reduceFunction is a GroupReduceFunction
-	private boolean richFunction;
-
 	private boolean combinable;
 
 	/**
@@ -176,8 +173,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 
 			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
 			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-			GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
-					new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+			GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po =
+					new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
 
 			po.setCombinable(combinable);
 			po.setInput(input);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
index 3223f4d..200c4de 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.DataSet;
  * Grouping is an intermediate step for a transformation on a grouped DataSet.<br/>
  * The following transformation can be applied on Grouping:
  * <ul>
- * 	<li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li>
- * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}, and</li>
+ * 	<li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
+ * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)}, and</li>
  * <li>{@link UnsortedGrouping#aggregate(org.apache.flink.api.java.aggregation.Aggregations, int)}.</li>
  * </ul>
  *

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index a07a157..ce0aea7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -467,7 +467,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			if (function == null) {
 				throw new NullPointerException("Join function must not be null.");
 			}
-			if (FunctionUtils.isSerializedLambdaFunction(function)) {
+			if (FunctionUtils.isLambdaFunction(function)) {
 				throw new UnsupportedLambdaExpressionException();
 			}
 			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
@@ -478,10 +478,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			if (function == null) {
 				throw new NullPointerException("Join function must not be null.");
 			}
-			if (FunctionUtils.isSerializedLambdaFunction(function)) {
+			if (FunctionUtils.isLambdaFunction(function)) {
 				throw new UnsupportedLambdaExpressionException();
 			}
-			FlatJoinFunction generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
+			FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
 			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
 			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 97b2417..767f75a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
  * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
  * The following transformation can be applied on sorted groups:
  * <ul>
- * 	<li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)},</li>
+ * 	<li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)},</li>
  * </ul>
  * 
  * @param <T> The type of the elements of the sorted and grouped DataSet.
@@ -82,7 +82,7 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+		if (FunctionUtils.isLambdaFunction(reducer)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
 		return new GroupReduceOperator<T, R>(this, reducer);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 9e71ba0..87b1454 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -127,7 +127,7 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+		if (FunctionUtils.isLambdaFunction(reducer)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
 		return new GroupReduceOperator<T, R>(this, reducer);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 5e80455..29eb5ed 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -39,7 +39,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 	public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
 			TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable)
 	{
-		super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+		super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
 				new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
 		
 		super.setCombinable(combinable);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
index fa0ca11..e9b5c25 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
 /**
- * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CoGroupOperator}.
+ * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CoGroupOperator}.
  */
 public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
index c4587fd..3a6c931 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.types.Record;
 
 /**
- * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CrossOperator}.
+ * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CrossOperator}.
  */
 public abstract class CrossFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> {
 	
@@ -41,10 +41,6 @@ public abstract class CrossFunction extends AbstractRichFunction implements org.
 	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
 	 *                   decide whether to retry the task execution.
 	 */
-
-	//@Override
-	//public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception;
-
 	@Override
 	public abstract Record cross(Record first, Record second) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
index dce24a3..cc4f96b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -25,25 +24,13 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
 /**
- * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.operators.JoinOperator}.
+ * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.record.operators.JoinOperator}.
  * It resembles an equality join of both inputs on their key fields.
  */
 public abstract class JoinFunction extends AbstractRichFunction implements FlatJoinFunction<Record, Record, Record> {
 	
 	private static final long serialVersionUID = 1L;
 	
-	/**
-	 * This method must be implemented to provide a user implementation of a join.
-	 * It is called for each two records that share the same key and come from different inputs.
-	 * 
-	 * @param value1 The record that comes from the first input.
-	 * @param value2 The record that comes from the second input.
-	 * @return The result of the join UDF as record
-	 * 
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the combine task and lets the fail-over logic
-	 *                   decide whether to retry the combiner execution.
-	 */
 	@Override
 	public abstract void join(Record value1, Record value2, Collector<Record> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
index 99c945d..b082e2d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -28,6 +27,7 @@ import org.apache.flink.util.Collector;
  * The MapFunction must be extended to provide a mapper implementation
  * By definition, the mapper is called for each individual input record.
  */
+@SuppressWarnings("deprecation")
 public abstract class MapFunction extends AbstractRichFunction implements GenericCollectorMap<Record, Record> {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
index 073b11a..a1e6369 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
@@ -29,7 +29,7 @@ import org.apache.flink.util.Collector;
 
 /**
  * The ReduceFunction must be extended to provide a reducer implementation, as invoked by a
- * {@link org.apache.flink.api.java.operators.ReduceOperator}.
+ * {@link org.apache.flink.api.java.record.operators.ReduceOperator}.
  */
 public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
index 64f70f6..85afa64 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java
@@ -41,6 +41,7 @@ import org.apache.flink.types.Record;
  * 
  * @see MapFunction
  */
+@SuppressWarnings("deprecation")
 public class MapOperator extends CollectorMapOperatorBase<Record, Record, MapFunction> implements RecordOperator {
 	
 	private static String DEFAULT_NAME = "<Unnamed Mapper>";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index 155bbd1..be872e5 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -129,7 +129,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 			assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
 			assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
 			if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) {
-				WrappingFunction wf = (WrappingFunction) solutionSetJoin.getUserCodeWrapper().getUserCodeObject();
+				WrappingFunction<?> wf = (WrappingFunction<?>) solutionSetJoin.getUserCodeWrapper().getUserCodeObject();
 				assertEquals(SolutionWorksetJoin.class, wf.getWrappedFunction().getClass());
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
new file mode 100644
index 0000000..ec3898e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DistinctOperator;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DistrinctTranslationTest {
+
+	@Test
+	public void testCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<String> input = env.fromElements("1", "2", "1", "3");
+			
+			
+			DistinctOperator<String> op = input.distinct(new KeySelector<String, String>() {
+				public String getKey(String value) { return value; }
+			});
+			
+			op.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput();
+			Assert.assertTrue(reduceOp.isCombinable());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index c6ad73d..8346d00 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -1385,6 +1385,8 @@ public class TypeExtractorTest {
 	public void testFunction() {
 		RichMapFunction<String, Boolean> mapInterface = new RichMapFunction<String, Boolean>() {
 			
+			private static final long serialVersionUID = 1L;
+
 			@Override
 			public void setRuntimeContext(RuntimeContext t) {
 				
@@ -1417,6 +1419,8 @@ public class TypeExtractorTest {
 	@Test
 	public void testInterface() {
 		MapFunction<String, Boolean> mapInterface = new MapFunction<String, Boolean>() {
+			private static final long serialVersionUID = 1L;
+			
 			@Override
 			public Boolean map(String record) throws Exception {
 				return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
index c417249..7dd4dea 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public class CoGroupITCase implements Serializable {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
index f8d217e..3875bab 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public class CrossITCase implements Serializable {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
index c775425..bb04336 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -34,9 +32,11 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
-@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
 public class FilterITCase extends JavaProgramTestBase {
 
+	private static final String EXPECTED_RESULT = "3,2,Hello world\n" +
+													"4,3,Hello world, how are you?\n";
 
 	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
 
@@ -68,15 +68,7 @@ public class FilterITCase extends JavaProgramTestBase {
 		return env.fromCollection(data);
 	}
 
-	private static int NUM_PROGRAMS = 1;
-
-	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
-	private String expectedResult;
-
-	public FilterITCase(Configuration config) {
-		super(config);
-	}
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -85,58 +77,18 @@ public class FilterITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		expectedResult = FilterProgs.runProgram(curProgId, resultPath);
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
+				filter(value -> value.f2.contains("world"));
+		filterDs.writeAsCsv(resultPath);
+		env.execute();
 	}
 
 	@Override
 	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Parameterized.Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-
-		return toParameterList(tConfigs);
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
 	}
-
-	private static class FilterProgs {
-
-		public static String runProgram(int progId, String resultPath) throws Exception {
-
-			switch(progId) {
-				case 1: {
-					/*
-					 * Test lambda filter
-					 * Functionality identical to org.apache.flink.test.javaApiOperators.FilterITCase test 3
-					 */
-
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-					DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env);
-					DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-							filter(value -> value.f2.contains("world"));
-					filterDs.writeAsCsv(resultPath);
-					env.execute();
-
-					// return expected result
-					return "3,2,Hello world\n" +
-							"4,3,Hello world, how are you?\n";
-				}
-				default:
-					throw new IllegalArgumentException("Invalid program id");
-			}
-
-		}
-
-	}
-
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
index 043b4e8..431151e 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public class FlatJoinITCase implements Serializable {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
index 55f507c..5cf7fc2 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -26,6 +26,7 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public class FlatMapITCase implements Serializable {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
index 494aff6..a86de1f 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public class GroupReduceITCase implements Serializable {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
index 3f4f696..d44d116 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public class JoinITCase implements Serializable {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
new file mode 100644
index 0000000..4c8ee23
--- /dev/null
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class LambdaExtractionTest {
+
+	@Test
+	public void testIdentifyLambdas() {
+		try {
+			MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() {
+				@Override
+				public Integer map(String value) { return Integer.parseInt(value); }
+			};
+			
+			MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() {
+				@Override
+				public Integer map(String value) { return Integer.parseInt(value); }
+			};
+			
+			MapFunction<?, ?> fromProperClass = new StaticMapper();
+			
+			MapFunction<?, ?> fromDerived = new ToTuple<Integer>() {
+				@Override
+				public Tuple2<Integer, Long> map(Integer value) {
+					return new Tuple2<Integer, Long>(value, 1L);
+				}
+			};
+			
+			MapFunction<String, Integer> lambda = (str) -> Integer.parseInt(str);
+			
+			assertFalse(FunctionUtils.isLambdaFunction(anonymousFromInterface));
+			assertFalse(FunctionUtils.isLambdaFunction(anonymousFromClass));
+			assertFalse(FunctionUtils.isLambdaFunction(fromProperClass));
+			assertFalse(FunctionUtils.isLambdaFunction(fromDerived));
+			assertTrue(FunctionUtils.isLambdaFunction(lambda));
+			assertTrue(FunctionUtils.isLambdaFunction(STATIC_LAMBDA));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	public static class StaticMapper implements MapFunction<String, Integer> {
+
+		@Override
+		public Integer map(String value) { return Integer.parseInt(value); }
+	}
+	
+	public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> {
+
+		@Override
+		public Tuple2<T, Long> map(T value) throws Exception;
+	}
+	
+	private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) -> Integer.parseInt(str);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
index 3af360b..5e9f732 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -26,6 +26,7 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public class MapITCase implements Serializable{
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
index ab27fe4..1a34814 100644
--- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
+++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -36,9 +36,20 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
-@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
 public class ReduceITCase extends JavaProgramTestBase {
 
+	private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" +
+			"2,3,2,Hallo Welt wie,1\n" +
+			"2,2,1,Hallo Welt,2\n" +
+			"3,9,0,P-),2\n" +
+			"3,6,5,BCD,3\n" +
+			"4,17,0,P-),1\n" +
+			"4,17,0,P-),2\n" +
+			"5,11,10,GHI,1\n" +
+			"5,29,0,P-),2\n" +
+			"5,25,0,P-),3\n";
+	
 	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
 
 		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
@@ -71,17 +82,9 @@ public class ReduceITCase extends JavaProgramTestBase {
 
 		return env.fromCollection(data, type);
 	}
-
-	private static int NUM_PROGRAMS = 1;
-
-	private int curProgId = config.getInteger("ProgramId", -1);
+	
 	private String resultPath;
-	private String expectedResult;
-
-	public ReduceITCase(Configuration config) {
-		super(config);
-	}
-
+	
 	@Override
 	protected void preSubmit() throws Exception {
 		resultPath = getTempDirPath("result");
@@ -89,72 +92,23 @@ public class ReduceITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		expectedResult = ReduceProgs.runProgram(curProgId, resultPath);
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
+				.groupBy(4, 0)
+				.reduce((in1, in2) -> {
+					Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
+					out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+					return out;
+				});
+
+		reduceDs.writeAsCsv(resultPath);
+		env.execute();
 	}
 
 	@Override
 	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Parameterized.Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-
-		return toParameterList(tConfigs);
-	}
-
-	private static class ReduceProgs {
-
-		public static String runProgram(int progId, String resultPath) throws Exception {
-
-			switch(progId) {
-				case 1: {
-					/*
-					 * Test reduce with lambda
-					 * Functionality identical to org.apache.flink.test.javaApiOperators.ReduceITCase test 2
-					 */
-
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-					DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env);
-					DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds
-							.groupBy(4, 0)
-							.reduce((in1, in2) -> {
-								Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-								out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
-								return out;
-							});
-
-					reduceDs.writeAsCsv(resultPath);
-					env.execute();
-
-					// return expected result
-					return "1,1,0,Hallo,1\n" +
-							"2,3,2,Hallo Welt wie,1\n" +
-							"2,2,1,Hallo Welt,2\n" +
-							"3,9,0,P-),2\n" +
-							"3,6,5,BCD,3\n" +
-							"4,17,0,P-),1\n" +
-							"4,17,0,P-),2\n" +
-							"5,11,10,GHI,1\n" +
-							"5,29,0,P-),2\n" +
-							"5,25,0,P-),3\n";
-				}
-				default:
-					throw new IllegalArgumentException("Invalid program id");
-			}
-
-		}
-
+		compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
 	}
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
index 1db3524..9ff7181 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.io;
 
 import java.io.IOException;
@@ -28,13 +27,12 @@ import org.apache.flink.util.Collector;
 /**
  * A {@link Collector} to update the solution set of a workset iteration.
  * <p/>
- * The records are written to a {@link MutableHashTable} hash table to allow in-memory point updates.
+ * The records are written to a HashTable hash table to allow in-memory point updates.
  * <p/>
  * Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
  * is for example the case when a solution set update happens directly after a solution set join. If this assumption
  * doesn't hold, use {@link SolutionSetUpdateOutputCollector}, which probes the hash table before updating.
- *
- * @see {SolutionSetUpdateOutputCollector}
+
  */
 public class SolutionSetFastUpdateOutputCollector<T> implements Collector<T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
index 17670f1..89789c35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
 /**
  * A {@link Collector} to update the solution set of a workset iteration.
  * <p/>
- * The records are written to a {@link MutableHashTable} hash table to allow in-memory point updates.
+ * The records are written to a HashTable hash table to allow in-memory point updates.
  * <p/>
  * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
  * already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 34cd232..636c492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -54,7 +54,7 @@ import java.io.IOException;
 /**
  * The base class for all tasks able to participate in an iteration.
  */
-public abstract class AbstractIterativePactTask<S extends RichFunction, OT> extends RegularPactTask<S, OT>
+public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT>
 		implements Terminable
 {
 	private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index 7a77cff..797bbb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.task;
 
 import java.io.IOException;
@@ -25,7 +24,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -57,7 +56,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * The head is responsible for coordinating an iteration and can run a
  * {@link org.apache.flink.runtime.operators.PactDriver} inside. It will read
  * the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing
- * the input, it will send {@link EndOfSuperstepEvent} events to its outputs. It must also be connected to a
+ * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a
  * synchronization task and after each superstep, it will wait
  * until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished
  * their iteration. Starting with
@@ -75,7 +74,7 @@ import org.apache.flink.util.MutableObjectIterator;
  *        The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the
  *        same as {@code X}
  */
-public class IterationHeadPactTask<X, Y, S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> {
+public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
 
 	private static final Log log = LogFactory.getLog(IterationHeadPactTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
index 2a8325c..c23eae1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.task;
 
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.io.network.api.BufferWriter;
 import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -32,16 +31,16 @@ import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
 import org.apache.flink.util.Collector;
 
 /**
- * An intermediate iteration task, which runs a {@link PactDriver} inside.
+ * An intermediate iteration task, which runs a Driver}inside.
  * <p/>
  * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore
  * intermediate tasks can also update the iteration state, either the workset or the solution set.
  * <p/>
  * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via
- * a {@link BlockingBackChannel} for the workset -XOR- a {@link MutableHashTable} for the solution set. In this case
+ * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case
  * this task must be scheduled on the same instance as the head.
  */
-public class IterationIntermediatePactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> {
+public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
 
 	private static final Log log = LogFactory.getLog(IterationIntermediatePactTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index a06ef5d..c44f443 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -44,9 +44,9 @@ import org.apache.flink.types.Value;
 import com.google.common.base.Preconditions;
 
 /**
- * The task responsible for synchronizing all iteration heads, implemented as an {@link AbstractOutputTask}. This task
+ * The task responsible for synchronizing all iteration heads, implemented as an output task. This task
  * will never see any data.
- * In each superstep, it simply waits until it has receiced a {@link WorkerDoneEvent} from each head and will send back
+ * In each superstep, it simply waits until it has received a {@link WorkerDoneEvent} from each head and will send back
  * an {@link AllWorkersDoneEvent} to signal that the next superstep can begin.
  */
 public class IterationSynchronizationSinkTask extends AbstractInvokable implements Terminable {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index 942e2f6..90d732c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -16,12 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.task;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
@@ -29,16 +28,16 @@ import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.util.Collector;
 
 /**
- * An iteration tail, which runs a {@link PactDriver} inside.
+ * An iteration tail, which runs a driver inside.
  * <p/>
  * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via
- * a {@link BlockingBackChannel} for the workset -OR- a {@link MutableHashTable} for the solution set. Therefore this
+ * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
  * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
  * and the solution set.
  * <p/>
  * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
  */
-public class IterationTailPactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT>
+public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT>
 		implements PactTaskContext<S, OT> {
 
 	private static final Log log = LogFactory.getLog(IterationTailPactTask.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java
deleted file mode 100644
index 78cf1f5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.functions.ExecutionContext;
-import org.apache.flink.runtime.execution.Environment;
-
-
-/**
- * Default implementation of the {@link ExecutionContext} that delegates the calls to the nephele task
- * environment.
- *
- */
-public class RuntimeExecutionContext implements ExecutionContext
-{
-	private final Environment env;
-
-	public RuntimeExecutionContext(Environment env) {
-		this.env = env;
-	}
-
-
-	@Override
-	public String getTaskName() {
-		return this.env.getTaskName();
-	}
-
-
-	@Override
-	public int getNumberOfSubtasks() {
-		return this.env.getCurrentNumberOfSubtasks();
-	}
-
-
-	@Override
-	public int getSubtaskIndex() {
-		return this.env.getIndexInSubtaskGroup() + 1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index aaad08c..99a59b1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -16,11 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.operators;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.record.functions.CrossFunction;
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -35,8 +32,6 @@ import org.apache.flink.test.util.RecordAPITestBase;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -47,17 +42,12 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.LinkedList;
 
-/**
- */
 @RunWith(Parameterized.class)
-//@Ignore("Test needs to be adapted to new cross signature")
 public class CrossITCase extends RecordAPITestBase {
 
-	private static final Log LOG = LogFactory.getLog(CrossITCase.class);
-
-	String leftInPath = null;
-	String rightInPath = null;
-	String resultPath = null;
+	private String leftInPath = null;
+	private String rightInPath = null;
+	private String resultPath = null;
 
 	public CrossITCase(Configuration testConfig) {
 		super(testConfig);
@@ -112,8 +102,6 @@ public class CrossITCase extends RecordAPITestBase {
 			int key1 = Integer.parseInt(string.toString());
 			string = record2.getField(0, string);
 			int key2 = Integer.parseInt(string.toString());
-			
-			LOG.debug("Processing { [" + key1 + "," + val1 + "] , [" + key2 + "," + val2 + "] }");
 
 			string.setValue((key1 + key2 + 2) + "");
 			integer.setValue(val2 - val1 + 1);


Mime
View raw message