flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/6] flink git commit: [FLINK-5237] Consolidate and harmonize Window Translation Tests
Date Wed, 11 Jan 2017 17:10:23 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 66de849..492d275 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,46 +17,59 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link WindowedStream} instantiate the correct
+ * window operator.
+ *
+ * <p>We also create a test harness and push one element into the operator to verify
+ * that we get some output.
  */
 @SuppressWarnings("serial")
 public class WindowTranslationTest {
@@ -66,13 +79,13 @@ public class WindowTranslationTest {
 	 * in a {@code ReducingState}.
 	 */
 	@Test(expected = UnsupportedOperationException.class)
-	public void testReduceFailWithRichReducer() throws Exception {
+	public void testReduceWithRichReducerFails() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		DataStream<Tuple2<String, Integer>> window1 = source
+		source
 			.keyBy(0)
 			.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 			.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
@@ -84,163 +97,43 @@ public class WindowTranslationTest {
 					return null;
 				}
 			});
+
+		fail("exception was not thrown");
 	}
 
 	/**
-	 * These tests ensure that the correct trigger is set when using event-time windows.
+	 * .fold() does not support RichFoldFunction, since the fold function is used internally
+	 * in a {@code FoldingState}.
 	 */
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testEventTime() throws Exception {
+	@Test(expected = UnsupportedOperationException.class)
+	public void testFoldWithRichFolderFails() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(0)
-				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof WindowOperator);
-		WindowOperator winOperator1 = (WindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
-		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.keyBy(0)
-				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(Tuple tuple,
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof WindowOperator);
-		WindowOperator winOperator2 = (WindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
-	}
-
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testNonEvicting() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
+		source
 				.keyBy(0)
 				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.trigger(CountTrigger.of(100))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof WindowOperator);
-		WindowOperator winOperator1 = (WindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
-		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.keyBy(0)
-				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.trigger(CountTrigger.of(100))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
+				.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = -6448847205314995812L;
 
 					@Override
-					public void apply(Tuple tuple,
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
+					public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+						return null;
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof WindowOperator);
-		WindowOperator winOperator2 = (WindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+		fail("exception was not thrown");
 	}
 
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testEvicting() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(0)
-				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.evictor(CountEvictor.of(100))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
-		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
-		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
-		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.keyBy(0)
-				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.trigger(CountTrigger.of(100))
-				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(Tuple tuple,
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
-		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
-		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
-	}
 
 	@Test
-	public void testSessionWithFold() throws Exception {
+	public void testSessionWithFoldFails() throws Exception {
 		// verify that fold does not work with merging windows
 
-		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
 				.keyBy(new KeySelector<String, String>() {
@@ -272,10 +165,10 @@ public class WindowTranslationTest {
 	}
 
 	@Test
-	public void testMergingAssignerWithNonMergingTrigger() throws Exception {
+	public void testMergingAssignerWithNonMergingTriggerFails() throws Exception {
 		// verify that we check for trigger compatibility
 
-		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
 				.keyBy(new KeySelector<String, String>() {
@@ -331,7 +224,649 @@ public class WindowTranslationTest {
 		fail("The trigger call should fail.");
 	}
 
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.reduce(new DummyReducer());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.reduce(new DummyReducer());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
+	/**
+	 * Ignored because we currently don't have the fast processing-time window operator.
+	 */
+	@Test
+	@SuppressWarnings("rawtypes")
+	@Ignore
+	public void testReduceFastProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.reduce(new DummyReducer());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof AggregatingProcessingTimeWindowOperator);
+
+		processElementAndEnsureOutput(operator, null, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(new DummyReducer(), new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String tuple,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	/**
+	 * Test for the deprecated .apply(Reducer, WindowFunction).
+	 */
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreReducerEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.fold(new Tuple3<>("", "", 0), new DummyFolder());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String,  Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreFolderEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyProcessingTimeTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithCustomTrigger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.trigger(CountTrigger.of(1))
+				.reduce(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithCustomTrigger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.trigger(CountTrigger.of(1))
+				.fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithCustomTrigger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.reduce(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public void testFoldWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	/**
+	 * Ensure that we get some output from the given operator when pushing in an element and
+	 * setting watermark and processing time to {@code Long.MAX_VALUE}.
+	 */
+	private static <K, IN, OUT> void processElementAndEnsureOutput(
+			OneInputStreamOperator<IN, OUT> operator,
+			KeySelector<IN, K> keySelector,
+			TypeInformation<K> keyType,
+			IN element) throws Exception {
+
+		KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						operator,
+						keySelector,
+						keyType);
+
+		testHarness.open();
+
+		testHarness.setProcessingTime(0);
+		testHarness.processWatermark(Long.MIN_VALUE);
+
+		testHarness.processElement(new StreamRecord<>(element, 0));
+
+		// provoke any processing-time/event-time triggers
+		testHarness.setProcessingTime(Long.MAX_VALUE);
+		testHarness.processWatermark(Long.MAX_VALUE);
 
+		// we at least get the two watermarks and should also see an output element
+		assertTrue(testHarness.getOutput().size() >= 3);
+
+		testHarness.close();
+	}
 
 	// ------------------------------------------------------------------------
 	//  UDFs
@@ -345,4 +880,23 @@ public class WindowTranslationTest {
 			return value1;
 		}
 	}
+
+	private static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
+		@Override
+		public Tuple3<String, String, Integer> fold(
+				Tuple3<String, String, Integer> accumulator,
+				Tuple2<String, Integer> value) throws Exception {
+			return accumulator;
+		}
+	}
+
+	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3ad451/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index b0cc961..ddc3e05 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -106,6 +106,14 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 	</dependencies>
 
 	<build>


Mime
View raw message