flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/2] flink git commit: [FLINK-3521] Make Iterable part of method signature for WindowFunction
Date Fri, 26 Feb 2016 23:09:48 GMT
[FLINK-3521] Make Iterable part of method signature for WindowFunction

This closes #1723


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27b5c49e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27b5c49e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27b5c49e

Branch: refs/heads/master
Commit: 27b5c49e76f58992fd5575959a7dea7088505e12
Parents: 64519e1
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Feb 26 15:19:50 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Sat Feb 27 00:07:05 2016 +0100

----------------------------------------------------------------------
 .../ml/IncrementalLearningSkeleton.java         |  2 +-
 .../GroupedProcessingTimeWindowExample.java     |  2 +-
 .../api/datastream/AllWindowedStream.java       |  8 +--
 .../api/datastream/CoGroupedStreams.java        |  2 +-
 .../api/datastream/WindowedStream.java          | 39 +++++------
 .../functions/windowing/AllWindowFunction.java  |  5 +-
 .../windowing/FoldApplyAllWindowFunction.java   |  5 +-
 .../windowing/FoldApplyWindowFunction.java      |  5 +-
 .../windowing/PassThroughAllWindowFunction.java |  6 +-
 .../windowing/PassThroughWindowFunction.java    |  6 +-
 .../windowing/ReduceApplyAllWindowFunction.java |  6 +-
 .../windowing/ReduceApplyWindowFunction.java    |  6 +-
 .../ReduceIterableAllWindowFunction.java        |  2 +-
 .../windowing/ReduceIterableWindowFunction.java |  2 +-
 .../api/functions/windowing/WindowFunction.java |  3 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |  8 +--
 ...ccumulatingProcessingTimeWindowOperator.java |  6 +-
 .../EvictingNonKeyedWindowOperator.java         |  2 +-
 .../windowing/EvictingWindowOperator.java       |  6 +-
 .../windowing/NonKeyedWindowOperator.java       |  4 +-
 .../operators/windowing/WindowOperator.java     | 12 ++--
 .../InternalIterableWindowFunction.java         | 72 +++++++++++++++++++
 .../InternalSingleValueWindowFunction.java      | 74 ++++++++++++++++++++
 .../functions/InternalWindowFunction.java       | 47 +++++++++++++
 .../flink/streaming/api/DataStreamTest.java     |  2 +-
 .../operators/FoldApplyWindowFunctionTest.java  |  6 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 10 +--
 .../windowing/AllWindowTranslationTest.java     |  6 +-
 .../windowing/EvictingWindowOperatorTest.java   |  7 +-
 .../windowing/TimeWindowTranslationTest.java    |  6 +-
 .../operators/windowing/WindowOperatorTest.java | 16 +++--
 .../windowing/WindowTranslationTest.java        |  7 +-
 .../streaming/api/scala/AllWindowedStream.scala | 43 ++++++++----
 .../streaming/api/scala/WindowedStream.scala    | 47 +++++++++----
 .../api/scala/function/AllWindowFunction.scala  | 45 ++++++++++++
 .../api/scala/function/WindowFunction.scala     | 47 +++++++++++++
 .../api/scala/AllWindowTranslationTest.scala    | 12 ++--
 .../api/scala/WindowTranslationTest.scala       | 12 ++--
 .../EventTimeAllWindowCheckpointingITCase.java  | 22 ++++--
 .../EventTimeWindowCheckpointingITCase.java     | 24 +++++--
 .../WindowCheckpointingITCase.java              |  4 +-
 41 files changed, 498 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index acbc5d6..4108485 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -167,7 +167,7 @@ public class IncrementalLearningSkeleton {
 	/**
 	 * Builds up-to-date partial models on new training data.
 	 */
-	public static class PartialModelBuilder implements AllWindowFunction<Iterable<Integer>, Double[], TimeWindow> {
+	public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
 		protected Double[] buildPartialModel(Iterable<Integer> values) {

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 196b73e..f08069b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -104,7 +104,7 @@ public class GroupedProcessingTimeWindowExample {
 		}
 	}
 
-	public static class SummingWindowFunction implements WindowFunction<Iterable<Tuple2<Long, Long>>, Tuple2<Long, Long>, Long, Window> {
+	public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
 
 		@Override
 		public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 56640d3..6b32880 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -222,11 +221,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function) {
+	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
 		@SuppressWarnings("unchecked, rawtypes")
-		TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class);
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, AllWindowFunction.class, true, true, iterTypeInfo, null, false);
+				function, AllWindowFunction.class, true, true, getInputType(), null, false);
 
 		return apply(function, resultType);
 	}
@@ -242,7 +240,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function, TypeInformation<R> resultType) {
+	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index e921940..713433c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -556,7 +556,7 @@ public class CoGroupedStreams<T1, T2> {
 
 	private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
 			extends WrappingFunction<CoGroupFunction<T1, T2, T>>
-			implements WindowFunction<Iterable<TaggedUnion<T1, T2>>, T, KEY, W> {
+			implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
 		
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 2ced99d..5c92fe0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -52,6 +51,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -218,11 +219,9 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<Iterable<T>, R, K, W> function) {
-		@SuppressWarnings("unchecked, rawtypes")
-		TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class);
+	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function) {
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, WindowFunction.class, true, true, iterTypeInfo, null, false);
+				function, WindowFunction.class, true, true, getInputType(), null, false);
 
 		return apply(function, resultType);
 	}
@@ -240,7 +239,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType) {
+	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
 
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
@@ -270,7 +269,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					function,
+					new InternalIterableWindowFunction<>(function),
 					trigger,
 					evictor);
 
@@ -285,7 +284,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					function,
+					new InternalIterableWindowFunction<>(function),
 					trigger);
 		}
 
@@ -350,13 +349,13 @@ public class WindowedStream<T, K, W extends Window> {
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
 			operator = new EvictingWindowOperator<>(windowAssigner,
-				windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-				keySel,
-				input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-				stateDesc,
-				new ReduceApplyWindowFunction<>(reduceFunction, function),
-				trigger,
-				evictor);
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
+					trigger,
+					evictor);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -370,7 +369,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
-					function,
+					new InternalSingleValueWindowFunction<>(function),
 					trigger);
 		}
 
@@ -441,7 +440,7 @@ public class WindowedStream<T, K, W extends Window> {
 				keySel,
 				input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 				stateDesc,
-				new FoldApplyWindowFunction<>(initialValue, foldFunction, function),
+				new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)),
 				trigger,
 				evictor);
 
@@ -458,7 +457,7 @@ public class WindowedStream<T, K, W extends Window> {
 				keySel,
 				input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 				stateDesc,
-				function,
+				new InternalSingleValueWindowFunction<>(function),
 				trigger);
 		}
 
@@ -694,7 +693,7 @@ public class WindowedStream<T, K, W extends Window> {
 			}
 			else if (function instanceof WindowFunction) {
 				@SuppressWarnings("unchecked")
-				WindowFunction<Iterable<T>, R, K, TimeWindow> wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
 
 				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
 						wf, input.getKeySelector(),
@@ -726,7 +725,7 @@ public class WindowedStream<T, K, W extends Window> {
 			}
 			else if (function instanceof WindowFunction) {
 				@SuppressWarnings("unchecked")
-				WindowFunction<Iterable<T>, R, K, TimeWindow> wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function;
+				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
 
 				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
 						wf, input.getKeySelector(),

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
index 62e86ca..c497b4a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
@@ -30,9 +30,10 @@ import java.io.Serializable;
  *
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
  */
 @Public
-public interface AllWindowFunction<IN, OUT,  W extends Window> extends Function, Serializable {
+public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable {
 
 	/**
 	 * Evaluates the window and outputs none or several elements.
@@ -43,5 +44,5 @@ public interface AllWindowFunction<IN, OUT,  W extends Window> extends Function,
 	 *
 	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
 	 */
-	void apply(W window, IN values, Collector<OUT> out) throws Exception;
+	void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
index 76fd562..a5bc0a1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
@@ -33,11 +33,12 @@ import org.apache.flink.util.Collector;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 
 @Internal
 public class FoldApplyAllWindowFunction<W extends Window, T, ACC>
 	extends WrappingFunction<AllWindowFunction<ACC, ACC, W>>
-	implements AllWindowFunction<Iterable<T>, ACC, W>, OutputTypeConfigurable<ACC> {
+	implements AllWindowFunction<T, ACC, W>, OutputTypeConfigurable<ACC> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -75,7 +76,7 @@ public class FoldApplyAllWindowFunction<W extends Window, T, ACC>
 			result = foldFunction.fold(result, val);
 		}
 
-		wrappedFunction.apply(window, result, out);
+		wrappedFunction.apply(window, Collections.singletonList(result), out);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
index 40e8830..756a683 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
@@ -33,11 +33,12 @@ import org.apache.flink.util.Collector;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 
 @Internal
 public class FoldApplyWindowFunction<K, W extends Window, T, ACC>
 	extends WrappingFunction<WindowFunction<ACC, ACC, K, W>>
-	implements WindowFunction<Iterable<T>, ACC, K, W>, OutputTypeConfigurable<ACC> {
+	implements WindowFunction<T, ACC, K, W>, OutputTypeConfigurable<ACC> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -75,7 +76,7 @@ public class FoldApplyWindowFunction<K, W extends Window, T, ACC>
 			result = foldFunction.fold(result, val);
 		}
 
-		wrappedFunction.apply(key, window, result, out);
+		wrappedFunction.apply(key, window, Collections.singletonList(result), out);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
index 3e3ffca..4435644 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
@@ -27,7 +27,9 @@ public class PassThroughAllWindowFunction<W extends Window, T> implements AllWin
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public void apply(W window, T input, Collector<T> out) throws Exception {
-		out.collect(input);
+	public void apply(W window, Iterable<T> input, Collector<T> out) throws Exception {
+		for (T in: input) {
+			out.collect(in);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
index 21709b9..319acb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
@@ -27,7 +27,9 @@ public class PassThroughWindowFunction<K, W extends Window, T> implements Window
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public void apply(K k, W window, T input, Collector<T> out) throws Exception {
-		out.collect(input);
+	public void apply(K k, W window, Iterable<T> input, Collector<T> out) throws Exception {
+		for (T in: input) {
+			out.collect(in);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
index ce1615b..5b8dd70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java
@@ -23,10 +23,12 @@ import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
+import java.util.Collections;
+
 @Internal
 public class ReduceApplyAllWindowFunction<W extends Window, T, R>
 	extends WrappingFunction<AllWindowFunction<T, R, W>>
-	implements AllWindowFunction<Iterable<T>, R, W> {
+	implements AllWindowFunction<T, R, W> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -51,6 +53,6 @@ public class ReduceApplyAllWindowFunction<W extends Window, T, R>
 				curr = reduceFunction.reduce(curr, val);
 			}
 		}
-		windowFunction.apply(window, curr, out);
+		windowFunction.apply(window, Collections.singletonList(curr), out);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
index 75ea2d2..f896282 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java
@@ -23,10 +23,12 @@ import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
+import java.util.Collections;
+
 @Internal
 public class ReduceApplyWindowFunction<K, W extends Window, T, R>
 	extends WrappingFunction<WindowFunction<T, R, K, W>>
-	implements WindowFunction<Iterable<T>, R, K, W> {
+	implements WindowFunction<T, R, K, W> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -51,6 +53,6 @@ public class ReduceApplyWindowFunction<K, W extends Window, T, R>
 				curr = reduceFunction.reduce(curr, val);
 			}
 		}
-		windowFunction.apply(k, window, curr, out);
+		windowFunction.apply(k, window, Collections.singletonList(curr), out);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
index a3b35ae..8ec5809 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 @Internal
-public class ReduceIterableAllWindowFunction<W extends Window, T> implements AllWindowFunction<Iterable<T>, T, W> {
+public class ReduceIterableAllWindowFunction<W extends Window, T> implements AllWindowFunction<T, T, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final ReduceFunction<T> reduceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
index e296411..afb0219 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 @Internal
-public class ReduceIterableWindowFunction<K, W extends Window, T> implements WindowFunction<Iterable<T>, T, K, W> {
+public class ReduceIterableWindowFunction<K, W extends Window, T> implements WindowFunction<T, T, K, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final ReduceFunction<T> reduceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
index 83ef18e..154fe88 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
@@ -31,6 +31,7 @@ import java.io.Serializable;
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
  * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
  */
 @Public
 public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
@@ -45,5 +46,5 @@ public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function
 	 * 
 	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery. 
 	 */
-	void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
+	void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index b830789..9b353fe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
 
-	private final WindowFunction<Iterable<Type>, Result, Key, Window> function;
+	private final WindowFunction<Type, Result, Key, Window> function;
 
 	/**
 	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
@@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	// ------------------------------------------------------------------------
 	
-	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Iterable<Type>, Result, Key, Window> function) {
+	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
 		this.keySelector = keySelector;
 		this.function = function;
 	}
@@ -86,7 +86,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	
 	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-		private final WindowFunction<Iterable<Type>, Result, Key, Window> function;
+		private final WindowFunction<Type, Result, Key, Window> function;
 		
 		private final UnionIterator<Type> unionIterator;
 		
@@ -99,7 +99,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		private Key currentKey;
 		
 
-		WindowFunctionTraversal(WindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
+		WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window,
 								Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
 			this.function = function;
 			this.out = out;

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 2f0d4fe..9ea2949 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -33,13 +33,13 @@ import java.util.ArrayList;
 
 @Internal
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
 	
 	public AccumulatingProcessingTimeWindowOperator(
-			WindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function,
+			WindowFunction<IN, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			TypeSerializer<IN> valueSerializer,
@@ -53,7 +53,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 	@Override
 	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
 		@SuppressWarnings("unchecked")
-		WindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (WindowFunction<Iterable<IN>, OUT, KEY, Window>) function;
+		WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
 		
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 510ebb2..221367d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -49,7 +49,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
 	public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			TypeSerializer<W> windowSerializer,
 			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
-			AllWindowFunction<Iterable<IN>, OUT, W> windowFunction,
+			AllWindowFunction<IN, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
 		super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index cfab3d5..16ca488 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -27,12 +27,12 @@ import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
@@ -48,7 +48,7 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <K> The type of key returned by the {@code KeySelector}.
  * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
  */
 @Internal
@@ -65,7 +65,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		KeySelector<IN, K> keySelector,
 		TypeSerializer<K> keySerializer,
 		StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
-		WindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
+		InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
 		Trigger<? super IN, ? super W> trigger,
 		Evictor<? super IN, ? super W> evictor) {
 		super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger);

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index e42d7b4..95feadc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -77,7 +77,7 @@ import static java.util.Objects.requireNonNull;
  */
 @Internal
 public class NonKeyedWindowOperator<IN, OUT, W extends Window>
-		extends AbstractUdfStreamOperator<OUT, AllWindowFunction<Iterable<IN>, OUT, W>>
+		extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
 
 	private static final long serialVersionUID = 1L;
@@ -146,7 +146,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			TypeSerializer<W> windowSerializer,
 			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
-			AllWindowFunction<Iterable<IN>, OUT, W> windowFunction,
+			AllWindowFunction<IN, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger) {
 
 		super(windowFunction);

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 289492b..9b7b347 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -34,7 +34,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -47,6 +46,7 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
@@ -74,7 +74,7 @@ import static java.util.Objects.requireNonNull;
  * <p>
  * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
  * the contents of the pane should be processed to emit results. When a trigger fires,
- * the given {@link WindowFunction} is invoked to produce the results that are emitted for
+ * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for
  * the pane to which the {@code Trigger} belongs.
  *
  * <p>
@@ -83,12 +83,12 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <K> The type of key returned by the {@code KeySelector}.
  * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
  */
 @Internal
 public class WindowOperator<K, IN, ACC, OUT, W extends Window>
-	extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>>
+	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
 	implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
 
 	private static final long serialVersionUID = 1L;
@@ -126,7 +126,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	// ------------------------------------------------------------------------
 
 	/**
-	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
+	 * This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp.
 	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
@@ -162,7 +162,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		KeySelector<IN, K> keySelector,
 		TypeSerializer<K> keySerializer,
 		StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor,
-		WindowFunction<ACC, OUT, K, W> windowFunction,
+		InternalWindowFunction<ACC, OUT, K, W> windowFunction,
 		Trigger<? super IN, ? super W> trigger) {
 
 		super(windowFunction);

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
new file mode 100644
index 0000000..32318ea
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> implements RichFunction {
+	private static final long serialVersionUID = 1L;
+
+	protected WindowFunction<IN, OUT, KEY, W> wrappedFunction;
+
+	public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) {
+		this.wrappedFunction = wrappedFunction;
+	}
+
+	@Override
+	public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		wrappedFunction.apply(key, window, input, out);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		FunctionUtils.openFunction(this.wrappedFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.wrappedFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
new file mode 100644
index 0000000..fd10e5c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<IN, OUT, KEY, W> implements RichFunction {
+	private static final long serialVersionUID = 1L;
+
+	protected WindowFunction<IN, OUT, KEY, W> wrappedFunction;
+
+	public InternalSingleValueWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) {
+		this.wrappedFunction = wrappedFunction;
+	}
+
+	@Override
+	public void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception {
+		wrappedFunction.apply(key, window, Collections.singletonList(input), out);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		FunctionUtils.openFunction(this.wrappedFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(this.wrappedFunction);
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
new file mode 100644
index 0000000..e75f3be
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Internal interface for functions that are evaluated over keyed (grouped) windows.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ */
+public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window> implements Function, Serializable {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Evaluates the window and outputs none or several elements.
+	 *
+	 * @param key The key for which this window is evaluated.
+	 * @param window The window that is being evaluated.
+	 * @param input The elements in the window being evaluated.
+	 * @param out A collector for emitting elements.
+	 *
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	public abstract void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 502198c..7a4d6f8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -518,7 +518,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		DataStream<String> window = map
 				.windowAll(GlobalWindows.create())
 				.trigger(PurgingTrigger.of(CountTrigger.of(5)))
-				.apply(new AllWindowFunction<Iterable<Tuple2<Integer, String>>, String, GlobalWindow>() {
+				.apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
 					@Override
 					public void apply(GlobalWindow window,
 							Iterable<Tuple2<Integer, String>> values,

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index b8d57a6..0b0ab9e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -70,9 +70,11 @@ public class FoldApplyWindowFunctionTest {
 				@Override
 				public void apply(Integer integer,
 					TimeWindow window,
-					Integer input,
+					Iterable<Integer> input,
 					Collector<Integer> out) throws Exception {
-					out.collect(input);
+					for (Integer in: input) {
+						out.collect(in);
+					}
 				}
 			}
 		);

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index b6e51c6..dff1184 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -67,7 +67,7 @@ import static org.junit.Assert.*;
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
-	private final WindowFunction<Iterable<String>, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
+	private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
 
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -79,8 +79,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 	};
 	
-	private final WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> validatingIdentityFunction =
-			new WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow>()
+	private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+			new WindowFunction<Integer, Integer, Integer, TimeWindow>()
 	{
 		@Override
 		public void apply(Integer key,
@@ -723,7 +723,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 	
-	private static class FailingFunction implements WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> {
+	private static class FailingFunction implements WindowFunction<Integer, Integer, Integer, TimeWindow> {
 
 		private final int failAfterElements;
 		
@@ -751,7 +751,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 
-	private static class StatefulFunction extends RichWindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> {
+	private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
 		
 		// we use a concurrent map here even though there is no concurrency, to
 		// get "volatile" style access to entries

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 0583290..42f452c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -76,7 +76,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() {
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -123,7 +123,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
-				.apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() {
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -172,7 +172,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
-				.apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() {
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 2f1dce5..6af7ac4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -70,7 +71,7 @@ public class EvictingWindowOperatorTest {
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
-				new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())),
 				CountTrigger.of(WINDOW_SLIDE),
 				CountEvictor.of(WINDOW_SIZE));
 
@@ -141,7 +142,7 @@ public class EvictingWindowOperatorTest {
 			new TupleKeySelector(),
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
-			new RichSumReducer<GlobalWindow>(closeCalled),
+			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
 			CountTrigger.of(WINDOW_SLIDE),
 			CountEvictor.of(WINDOW_SIZE));
 
@@ -208,7 +209,7 @@ public class EvictingWindowOperatorTest {
 		}
 	}
 
-	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, String, W> {
+	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
 		private static final long serialVersionUID = 1L;
 
 		private boolean openCalled = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index a5a8df3..f214941 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -74,7 +74,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
 				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
-				.apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -117,7 +117,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<String, Integer>> window2 = source
 			.keyBy(0)
 			.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
-			.apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+			.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 				private static final long serialVersionUID = 1L;
 
 				@Override
@@ -165,7 +165,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS))
-				.apply(new AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow>() {
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index c1111a0..a1f08ad 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -41,6 +41,8 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -140,7 +142,7 @@ public class WindowOperatorTest {
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
-				new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(),
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				EventTimeTrigger.create());
 
 		operator.setInputType(inputType, new ExecutionConfig());
@@ -176,7 +178,7 @@ public class WindowOperatorTest {
 			new TupleKeySelector(),
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
-			new RichSumReducer<TimeWindow>(),
+			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
 			EventTimeTrigger.create());
 
 		operator.setInputType(inputType, new ExecutionConfig());
@@ -271,7 +273,7 @@ public class WindowOperatorTest {
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
-				new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(),
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
 				EventTimeTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -306,7 +308,7 @@ public class WindowOperatorTest {
 			new TupleKeySelector(),
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 			stateDesc,
-			new RichSumReducer<TimeWindow>(),
+			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
 			EventTimeTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -344,7 +346,7 @@ public class WindowOperatorTest {
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
-				new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(),
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
 				ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -434,7 +436,7 @@ public class WindowOperatorTest {
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				stateDesc,
-				new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(),
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
@@ -497,7 +499,7 @@ public class WindowOperatorTest {
 	}
 
 
-	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, String, W> {
+	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
 		private static final long serialVersionUID = 1L;
 
 		private boolean openCalled = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index c57da8a..30bb840 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -104,7 +104,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
 				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -112,7 +112,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
-
 					}
 				});
 
@@ -153,7 +152,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 				.keyBy(0)
 				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
-				.apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -204,7 +203,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
-				.apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index e36542e..3c91529 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -179,10 +180,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * @return The data stream that is the result of applying the window function to the window.
    */
   def apply[R: TypeInformation](
-      function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = {
+      function: AllWindowFunction[T, R, W]): DataStream[R] = {
     
     val cleanedFunction = clean(function)
-    val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
+    val javaFunction = new JAllWindowFunction[T, R, W] {
       def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
         cleanedFunction(window, elements.asScala, out)
       }
@@ -205,7 +206,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
       function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     
     val cleanedFunction = clean(function)
-    val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
+    val applyFunction = new JAllWindowFunction[T, R, W] {
       def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
         cleanedFunction(window, elements.asScala, out)
       }
@@ -228,8 +229,15 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
       preAggregator: ReduceFunction[T],
       function: AllWindowFunction[T, R, W]): DataStream[R] = {
 
+    val cleanedFunction = clean(function)
+    val applyFunction = new JAllWindowFunction[T, R, W] {
+      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanedFunction(window, elements.asScala, out)
+      }
+    }
+
     val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(javaStream.apply(clean(preAggregator), clean(function), returnType))
+    asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, returnType))
   }
 
   /**
@@ -245,7 +253,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    */
   def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
-      function: (W, T, Collector[R]) => Unit): DataStream[R] = {
+      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
@@ -259,9 +267,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     }
 
     val cleanApply = clean(function)
-    val applyFunction = new AllWindowFunction[T, R, W] {
-      def apply(window: W, input: T, out: Collector[R]): Unit = {
-        cleanApply(window, input, out)
+    val applyFunction = new JAllWindowFunction[T, R, W] {
+      def apply(window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanApply(window, input.asScala, out)
       }
     }
     
@@ -285,11 +293,18 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
       initialValue: R,
       preAggregator: FoldFunction[T, R],
       function: AllWindowFunction[R, R, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val applyFunction = new JAllWindowFunction[R, R, W] {
+      def apply(window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = {
+        cleanedFunction(window, elements.asScala, out)
+      }
+    }
     
     asScalaStream(javaStream.apply(
       initialValue,
       clean(preAggregator),
-      clean(function),
+      applyFunction,
       implicitly[TypeInformation[R]]))
   }
 
@@ -308,7 +323,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
   def apply[R: TypeInformation](
       initialValue: R,
       preAggregator: (R, T) => R,
-      function: (W, R, Collector[R]) => Unit): DataStream[R] = {
+      function: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
@@ -322,9 +337,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     }
 
     val cleanApply = clean(function)
-    val applyFunction = new AllWindowFunction[R, R, W] {
-      def apply(window: W, input: R, out: Collector[R]): Unit = {
-        cleanApply(window, input, out)
+    val applyFunction = new JAllWindowFunction[R, R, W] {
+      def apply(window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = {
+        cleanApply(window, input.asScala, out)
       }
     }
     val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]

http://git-wip-us.apache.org/repos/asf/flink/blob/27b5c49e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 53f033c..b7f9e00 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -24,7 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction}
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -182,10 +183,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * @return The data stream that is the result of applying the window function to the window.
    */
   def apply[R: TypeInformation](
-      function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = {
+      function: WindowFunction[T, R, K, W]): DataStream[R] = {
     
     val cleanFunction = clean(function)
-    val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] {
+    val javaFunction = new JWindowFunction[T, R, K, W] {
       def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = {
         cleanFunction.apply(key, window, input.asScala, out)
       }
@@ -212,7 +213,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     }
 
     val cleanedFunction = clean(function)
-    val applyFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] {
+    val applyFunction = new JWindowFunction[T, R, K, W] {
       def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
         cleanedFunction(key, window, elements.asScala, out)
       }
@@ -235,8 +236,16 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       preAggregator: ReduceFunction[T],
       function: WindowFunction[T, R, K, W]): DataStream[R] = {
 
+    val cleanedFunction = clean(function)
+
+    val applyFunction = new JWindowFunction[T, R, K, W] {
+      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanedFunction.apply(key, window, elements.asScala, out)
+      }
+    }
+
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(javaStream.apply(clean(preAggregator), clean(function), resultType))
+    asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, resultType))
   }
 
   /**
@@ -252,7 +261,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    */
   def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
-      function: (K, W, T, Collector[R]) => Unit): DataStream[R] = {
+      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
     
     if (function == null) {
       throw new NullPointerException("Reduce function must not be null.")
@@ -267,9 +276,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     }
 
     val cleanApply = clean(function)
-    val applyFunction = new WindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, input: T, out: Collector[R]): Unit = {
-        cleanApply(key, window, input, out)
+    val applyFunction = new JWindowFunction[T, R, K, W] {
+      def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanApply(key, window, input.asScala, out)
       }
     }
     
@@ -292,11 +301,19 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       initialValue: R,
       foldFunction: FoldFunction[T, R],
       function: WindowFunction[R, R, K, W]): DataStream[R] = {
-    
+
+    val cleanedFunction = clean(function)
+
+    val applyFunction = new JWindowFunction[R, R, K, W] {
+      def apply(key: K, window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = {
+        cleanedFunction.apply(key, window, elements.asScala, out)
+      }
+    }
+
     asScalaStream(javaStream.apply(
       initialValue,
       clean(foldFunction),
-      clean(function),
+      applyFunction,
       implicitly[TypeInformation[R]]))
   }
 
@@ -314,7 +331,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
   def apply[R: TypeInformation](
       initialValue: R,
       foldFunction: (R, T) => R,
-      function: (K, W, R, Collector[R]) => Unit): DataStream[R] = {
+      function: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
     
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -329,9 +346,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     }
 
     val cleanApply = clean(function)
-    val applyFunction = new WindowFunction[R, R, K, W] {
-      def apply(key: K, window: W, input: R, out: Collector[R]): Unit = {
-        cleanApply(key, window, input, out)
+    val applyFunction = new JWindowFunction[R, R, K, W] {
+      def apply(key: K, window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = {
+        cleanApply(key, window, input.asScala, out)
       }
     }
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]


Mime
View raw message