flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-5157] [streaming] Introduce ProcessAllWindowFunction
Date Tue, 28 Feb 2017 13:13:42 GMT
Repository: flink
Updated Branches:
  refs/heads/master 87b907736 -> 788b83921


http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index b6c1618..34eac9e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.datastream.AllWindowedStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
@@ -383,6 +384,119 @@ public class AllWindowTranslationTest {
 		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithProcessWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(reducer, new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(new DummyReducer(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.reduce(
+						reducer,
+						new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+							@Override
+							public void process(
+									Context context,
+									Iterable<Tuple2<String, Integer>> elements,
+									Collector<Tuple2<String, Integer>> out) throws Exception {
+								for (Tuple2<String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
 	/**
 	 * Test for the deprecated .apply(Reducer, WindowFunction).
 	 */
@@ -540,6 +654,226 @@ public class AllWindowTranslationTest {
 				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	@Test
+	public void testAggregateWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.aggregate(new DummyAggregationFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testAggregateWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.aggregate(
+						new DummyAggregationFunction(),
+						new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+							@Override
+							public void process(
+									Context context,
+									Iterable<Tuple2<String, Integer>> elements,
+									Collector<Tuple2<String, Integer>> out) throws Exception {
+								for (Tuple2<String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	// ------------------------------------------------------------------------
+	//  process() translation tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testProcessEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testProcessProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testProcessWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+				.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testProcessWithCustomTrigger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
 	// ------------------------------------------------------------------------
 	//  fold() translation tests
 	// ------------------------------------------------------------------------
@@ -666,6 +1000,117 @@ public class AllWindowTranslationTest {
 
 	@Test
 	@SuppressWarnings("rawtypes")
+	public void testFoldWithProcessAllWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+					@Override
+					public void process(
+							Context ctx,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithProcessAllWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(
+							Context ctx,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public void testFoldWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.fold(
+						new Tuple3<>("", "", 1),
+						new DummyFolder(),
+						new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+							@Override
+							public void process(
+									Context context,
+									Iterable<Tuple3<String, String, Integer>> elements,
+									Collector<Tuple3<String, String, Integer>> out) throws Exception {
+								for (Tuple3<String, String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
 	public void testApplyWithPreFolderEventTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index cf062fc..694353c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -18,20 +18,19 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{PublicEvolving, Public}
+import org.apache.flink.annotation.{Public, PublicEvolving}
 import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.scala.function.AllWindowFunction
-import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaReduceFunction, ScalaFoldFunction}
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaFoldFunction, ScalaProcessAllWindowFunctionWrapper, ScalaReduceFunction}
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
-
 import org.apache.flink.util.Preconditions.checkNotNull
 
 /**
@@ -199,6 +198,64 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     asScalaStream(javaStream.reduce(reducer, applyFunction, returnType))
   }
 
+  /**
+    * Applies the given window function to each window. The window function is called for each
+    * evaluation of the window for each key individually. The output of the window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def reduce[R: TypeInformation](
+      preAggregator: ReduceFunction[T],
+      windowFunction: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+    val cleanedReducer = clean(preAggregator)
+    val cleanedWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanedWindowFunction)
+
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(cleanedReducer, applyFunction, returnType))
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is called for each
+    * evaluation of the window for each key individually. The output of the window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def reduce[R: TypeInformation](
+      preAggregator: (T, T) => T,
+      windowFunction: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+    if (preAggregator == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (windowFunction == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val reducer = new ScalaReduceFunction[T](cleanReducer)
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanWindowFunction)
+
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(reducer, applyFunction, returnType))
+  }
+
   // --------------------------- aggregate() ----------------------------------
 
   /**
@@ -257,6 +314,39 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
 
   /**
    * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given aggregation function.
+   *
+   * @param preAggregator The aggregation function that is used for pre-aggregation
+   * @param windowFunction The process window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  @PublicEvolving
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
+      (preAggregator: AggregateFunction[T, ACC, V],
+       windowFunction: ProcessAllWindowFunction[V, R, W]): DataStream[R] = {
+
+    checkNotNull(preAggregator, "AggregationFunction must not be null")
+    checkNotNull(windowFunction, "Window function must not be null")
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[V, R, W](cleanedWindowFunction)
+
+    val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+    val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    asScalaStream(javaStream.aggregate(
+      cleanedPreAggregator, applyFunction,
+      accumulatorType, aggregationResultType, resultType))
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is called for each
    * evaluation of the window. The output of the window function is
    * interpreted as a regular non-windowed stream.
    *
@@ -367,6 +457,37 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     *
     * @param initialValue Initial value of the fold
     * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The process window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def fold[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC,
+      preAggregator: FoldFunction[T, ACC],
+      windowFunction: ProcessAllWindowFunction[ACC, R, W]): DataStream[R] = {
+
+    val cleanFolder = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[ACC, R, W](cleanWindowFunction)
+
+    asScalaStream(javaStream.fold(
+      initialValue,
+      cleanFolder,
+      applyFunction,
+      implicitly[TypeInformation[ACC]],
+      implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is called for each
+    * evaluation of the window for each key individually. The output of the window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation folder.
+    *
+    * @param initialValue Initial value of the fold
+    * @param preAggregator The reduce function that is used for pre-aggregation
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
@@ -393,6 +514,42 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, returnType))
   }
 
+  /**
+    * Applies the given window function to each window. The window function is called for each
+    * evaluation of the window for each key individually. The output of the window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation folder.
+    *
+    * @param initialValue Initial value of the fold
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  @PublicEvolving
+  def fold[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC,
+      preAggregator: (ACC, T) => ACC,
+      windowFunction: ProcessAllWindowFunction[ACC, R, W]): DataStream[R] = {
+
+    if (preAggregator == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (windowFunction == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanFolder = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val folder = new ScalaFoldFunction[T, ACC](cleanFolder)
+    val applyFunction = new ScalaProcessAllWindowFunctionWrapper[ACC, R, W](cleanWindowFunction)
+
+    val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, returnType))
+  }
+
   // ---------------------------- apply() -------------------------------------
 
   /**
@@ -403,6 +560,27 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    * Not that this function requires that all data in the windows is buffered until the window
    * is evaluated, as the function provides no means of pre-aggregation.
    *
+   * @param function The process window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  @PublicEvolving
+  def process[R: TypeInformation](
+      function: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val javaFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanedFunction)
+
+    asScalaStream(javaStream.process(javaFunction, implicitly[TypeInformation[R]]))
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Not that this function requires that all data in the windows is buffered until the window
+   * is evaluated, as the function provides no means of pre-aggregation.
+   *
    * @param function The window function.
    * @return The data stream that is the result of applying the window function to the window.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
new file mode 100644
index 0000000..163117b
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+  * Base abstract class for functions that are evaluated over keyed (grouped)
+  * windows using a context for retrieving extra information.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam W The type of the window.
+  */
+@PublicEvolving
+abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable {
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param context  The context in which the window is being evaluated.
+    * @param elements The elements in the window being evaluated.
+    * @param out      A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def process(context: Context, elements: Iterable[IN], out: Collector[OUT])
+
+  /**
+    * The context holding window metadata
+    */
+  abstract class Context {
+    /**
+      * @return The window that is being evaluated.
+      */
+    def window: W
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
new file mode 100644
index 0000000..22d64a8
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.beans.Transient
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base abstract class for functions that are evaluated over
+  * keyed (grouped) windows using a context for retrieving extra information.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam W The type of the window.
+  */
+@Public
+abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
+    extends ProcessAllWindowFunction[IN, OUT, W]
+    with RichFunction {
+
+  @Transient
+  private var runtimeContext: RuntimeContext = null
+
+  // --------------------------------------------------------------------------------------------
+  //  Runtime context access
+  // --------------------------------------------------------------------------------------------
+
+  override def setRuntimeContext(t: RuntimeContext) {
+    this.runtimeContext = t
+  }
+
+  override def getRuntimeContext: RuntimeContext = {
+    if (this.runtimeContext != null) {
+      this.runtimeContext
+    }
+    else {
+      throw new IllegalStateException("The runtime context has not been initialized.")
+    }
+  }
+
+  override def getIterationRuntimeContext: IterationRuntimeContext = {
+    if (this.runtimeContext == null) {
+      throw new IllegalStateException("The runtime context has not been initialized.")
+    }
+    else {
+      this.runtimeContext match {
+        case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
+        case _ =>
+          throw new IllegalStateException("This stub is not part of an iteration step function.")
+      }
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Default life cycle methods
+  // --------------------------------------------------------------------------------------------
+
+  @throws[Exception]
+  override def open(parameters: Configuration) {
+  }
+
+  @throws[Exception]
+  override def close() {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 23293a6..a4fec64 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -18,8 +18,16 @@
 
 package org.apache.flink.streaming.api.scala.function.util
 
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
-import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction => JRichProcessWindowFunction}
+import org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction => JRichProcessAllWindowFunction}
+import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => ScalaRichProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => ScalaRichProcessAllWindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -34,8 +42,8 @@ import scala.collection.JavaConverters._
   *   - Java WindowFunction: java.lang.Iterable
   */
 final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
-    private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
-    extends JProcessWindowFunction[IN, OUT, KEY, W] {
+    private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W])
+    extends JRichProcessWindowFunction[IN, OUT, KEY, W] {
 
   override def process(
       key: KEY,
@@ -47,4 +55,75 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
     }
     func.process(key, ctx, elements.asScala, out)
   }
+
+  override def setRuntimeContext(t: RuntimeContext): Unit = {
+    super.setRuntimeContext(t)
+    func match {
+      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t)
+      case _ =>
+    }
+  }
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    func match {
+      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters)
+      case _ =>
+    }
+  }
+
+  override def close(): Unit = {
+    super.close()
+    func match {
+      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
+      case _ =>
+    }
+  }
+}
+
+/**
+  * A wrapper function that exposes a Scala ProcessWindowFunction
+  * as a ProcessWindowFunction function.
+  *
+  * The Scala and Java Window functions differ in their type of "Iterable":
+  *   - Scala WindowFunction: scala.Iterable
+  *   - Java WindowFunction: java.lang.Iterable
+  */
+final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
+    private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W])
+    extends JRichProcessAllWindowFunction[IN, OUT, W] {
+
+  override def process(
+      context: JProcessAllWindowFunction[IN, OUT, W]#Context,
+      elements: java.lang.Iterable[IN],
+      out: Collector[OUT]): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+    }
+    func.process(ctx, elements.asScala, out)
+  }
+
+  override def setRuntimeContext(t: RuntimeContext): Unit = {
+    super.setRuntimeContext(t)
+    func match {
+      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t)
+      case _ =>
+    }
+  }
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    func match {
+      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters)
+      case _ =>
+    }
+  }
+
+  override def close(): Unit = {
+    super.close()
+    func match {
+      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
+      case _ =>
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 7e067a0..ee9f50c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
@@ -356,6 +356,85 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testReduceWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def process(context: Context,
+                               elements: Iterable[(String, Int)],
+                               out: Collector[(String, Int)]): Unit = {
+            elements foreach ( x => out.collect(x))
+          }
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testReduceWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+      .reduce(
+        new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testApplyWithPreReducerEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -566,6 +645,72 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testAggregateWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+      .aggregate(new DummyAggregator(), new TestProcessAllWindowFunction())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testAggregateWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+        .aggregate(new DummyAggregator(), new TestProcessAllWindowFunction())
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testAggregateWithWindowFunctionEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -789,6 +934,88 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testFoldWithProcessWindowFunctionEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        new DummyFolder,
+        new ProcessAllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testFoldWithProcessWindowFunctionProcessingTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .fold(
+        ("", "", 1),
+        new DummyFolder,
+        new ProcessAllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testApplyWithPreFolderEventTime() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -951,6 +1178,84 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testProcessEventTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .process(
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
+  def testProcessProcessingTimeTime() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .process(
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testApplyEventTimeWithScalaFunction() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1095,6 +1400,46 @@ class AllWindowTranslationTest {
   }
 
   @Test
+  def testProcessWithCustomTrigger() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .trigger(CountTrigger.of(1))
+      .process(
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
+
+  @Test
   def testReduceWithEvictor() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1211,6 +1556,46 @@ class AllWindowTranslationTest {
       ("hello", 1))
   }
 
+  @Test
+  def testProcessWithEvictor() {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val window1 = source
+      .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+      .evictor(CountEvictor.of(100))
+      .process(
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
+              input: Iterable[(String, Int)],
+              out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+        })
+
+    val transform = window1
+      .javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator = transform.getOperator
+    assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+    val winOperator = operator
+      .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+    assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+    assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+    assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+    assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+    processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+      winOperator,
+      winOperator.getKeySelector,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ("hello", 1))
+  }
 
   /**
     * Ensure that we get some output from the given operator when pushing in an element and
@@ -1218,10 +1603,10 @@ class AllWindowTranslationTest {
     */
   @throws[Exception]
   private def processElementAndEnsureOutput[K, IN, OUT](
-                                                         operator: OneInputStreamOperator[IN, OUT],
-                                                         keySelector: KeySelector[IN, K],
-                                                         keyType: TypeInformation[K],
-                                                         element: IN) {
+      operator: OneInputStreamOperator[IN, OUT],
+      keySelector: KeySelector[IN, K],
+      keyType: TypeInformation[K],
+      element: IN) {
     val testHarness =
       new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType)
 
@@ -1243,7 +1628,8 @@ class AllWindowTranslationTest {
   }
 }
 
-class TestAllWindowFunction extends AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+class TestAllWindowFunction
+    extends AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
 
   override def apply(
       window: TimeWindow,
@@ -1253,3 +1639,15 @@ class TestAllWindowFunction extends AllWindowFunction[(String, Int), (String, In
     input.foreach(out.collect)
   }
 }
+
+class TestProcessAllWindowFunction
+    extends ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+
+  override def process(
+      context: Context,
+      input: Iterable[(String, Int)],
+      out: Collector[(String, Int)]): Unit = {
+
+    input.foreach(out.collect)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index a23145c..dc38758 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -150,7 +150,6 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  @Ignore
   def testFoldWithProcessWindowFunction(): Unit = {
     WindowFoldITCase.testResults = mutable.MutableList()
     CheckingIdentityRichProcessWindowFunction.reset()
@@ -310,6 +309,63 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
     CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
   }
+
+  @Test
+  def testFoldAllWithProcessWindowFunction(): Unit = {
+    WindowFoldITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessAllWindowFunction.reset()
+
+    val foldFunc = new FoldFunction[(String, Int), (String, Int)] {
+      override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = {
+        (accumulator._1 + value._1, accumulator._2 + value._2)
+      }
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("a", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("a", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 5))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .fold(
+        ("R:", 0),
+        foldFunc,
+        new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFoldITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold All-Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(R:aaa,3)",
+      "(R:bababa,24)")
+
+    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
index bfbe6ee..eb9f361 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -87,7 +87,6 @@ class WindowFunctionITCase {
   }
 
   @Test
-  @Ignore
   def testRichProcessWindowFunction(): Unit = {
     WindowFunctionITCase.testResults = mutable.MutableList()
     CheckingIdentityRichProcessWindowFunction.reset()
@@ -183,6 +182,54 @@ class WindowFunctionITCase {
 
     CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
   }
+
+  @Test
+  def testRichProcessAllWindowFunction(): Unit = {
+    WindowFunctionITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessAllWindowFunction.reset()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 6))
+        ctx.collect(("a", 7))
+        ctx.collect(("a", 8))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {}
+
+    }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .process(new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowFunctionITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("RichAllWindowFunction Test")
+
+    val expectedResult = mutable.MutableList(
+      "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+      "(b,3)", "(b,4)", "(b,5)")
+
+    assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+  }
 }
 
 object WindowFunctionITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index 5418108..ee1dbfd 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -149,7 +149,6 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  @Ignore
   def testReduceWithProcessWindowFunction(): Unit = {
     WindowReduceITCase.testResults = mutable.MutableList()
     CheckingIdentityRichProcessWindowFunction.reset()
@@ -307,6 +306,62 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
 
     CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
   }
+
+  @Test
+  def testReduceAllWithProcessWindowFunction(): Unit = {
+    WindowReduceITCase.testResults = mutable.MutableList()
+    CheckingIdentityRichProcessAllWindowFunction.reset()
+
+    val reduceFunc = new ReduceFunction[(String, Int)] {
+      override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
+        (a._1 + b._1, a._2 + b._2)
+      }
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+        ctx.collect(("a", 0))
+        ctx.collect(("a", 1))
+        ctx.collect(("a", 2))
+        ctx.collect(("b", 3))
+        ctx.collect(("a", 3))
+        ctx.collect(("b", 4))
+        ctx.collect(("a", 4))
+        ctx.collect(("b", 5))
+        ctx.collect(("a", 5))
+
+        // source is finite, so it will have an implicit MAX watermark when it finishes
+      }
+
+      def cancel() {
+      }
+    }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+    source1
+      .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+      .reduce(
+        reduceFunc,
+        new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
+      .addSink(new SinkFunction[(String, Int)]() {
+        def invoke(value: (String, Int)) {
+          WindowReduceITCase.testResults += value.toString
+        }
+      })
+
+    env.execute("Fold All-Window Test")
+
+    val expectedResult = mutable.MutableList(
+      "(aaa,3)",
+      "(bababa,24)")
+
+    assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+    CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+  }
 }
 
 object WindowReduceITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
new file mode 100644
index 0000000..df005fa
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window]
+  extends RichProcessAllWindowFunction[T, T, W] {
+
+  override def process(context: Context, input: Iterable[T], out: Collector[T]): Unit = {
+    for (value <- input) {
+      out.collect(value)
+    }
+  }
+
+  override def open(conf: Configuration): Unit = {
+    super.open(conf)
+    CheckingIdentityRichProcessAllWindowFunction.openCalled = true
+  }
+
+  override def close(): Unit = {
+    super.close()
+    CheckingIdentityRichProcessAllWindowFunction.closeCalled = true
+  }
+
+  override def setRuntimeContext(context: RuntimeContext): Unit = {
+    super.setRuntimeContext(context)
+    CheckingIdentityRichProcessAllWindowFunction.contextSet = true
+  }
+}
+
+object CheckingIdentityRichProcessAllWindowFunction {
+
+  @volatile
+  private[CheckingIdentityRichProcessAllWindowFunction] var closeCalled = false
+
+  @volatile
+  private[CheckingIdentityRichProcessAllWindowFunction] var openCalled = false
+
+  @volatile
+  private[CheckingIdentityRichProcessAllWindowFunction] var contextSet = false
+
+  def reset(): Unit = {
+    closeCalled = false
+    openCalled = false
+    contextSet = false
+  }
+
+  def checkRichMethodCalls(): Unit = {
+    if (!contextSet) {
+      throw new AssertionError("context not set")
+    }
+    if (!openCalled) {
+      throw new AssertionError("open() not called")
+    }
+    if (!closeCalled) {
+      throw new AssertionError("close() not called")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 7d37d1a..903179d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
@@ -253,6 +254,78 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 		Assert.assertEquals(expectedResult, testResults);
 	}
 
+	@Test
+	public void testFoldProcessAllWindow() throws Exception {
+
+		testResults = new ArrayList<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+				ctx.collect(Tuple2.of("a", 2));
+
+				ctx.collect(Tuple2.of("b", 3));
+				ctx.collect(Tuple2.of("b", 4));
+				ctx.collect(Tuple2.of("b", 5));
+
+				ctx.collect(Tuple2.of("a", 6));
+				ctx.collect(Tuple2.of("a", 7));
+				ctx.collect(Tuple2.of("a", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {}
+
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+		source1
+			.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+			.fold(Tuple2.of(0, "R:"), new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() {
+				@Override
+				public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception {
+					accumulator.f1 += value.f0;
+					accumulator.f0 += value.f1;
+					return accumulator;
+				}
+			}, new ProcessAllWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, TimeWindow>() {
+				@Override
+				public void process(Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
+					int i = 0;
+					for (Tuple2<Integer, String> in : elements) {
+						out.collect(new Tuple3<>(in.f1, in.f0, i++));
+					}
+				}
+			})
+			.addSink(new SinkFunction<Tuple3<String, Integer, Integer>>() {
+				@Override
+				public void invoke(Tuple3<String, Integer, Integer> value) throws Exception {
+					testResults.add(value.toString());
+				}
+			});
+
+		env.execute("Fold Process Window Test");
+
+		List<String> expectedResult = Arrays.asList(
+			"(R:aaa,3,0)",
+			"(R:aaa,21,0)",
+			"(R:bbb,12,0)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
 	private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
 
 		@Override


Mime
View raw message