flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [10/15] flink git commit: [FLINK-3995] [build] flink-test-utils also contains the streaming test utilities.
Date Tue, 05 Jul 2016 14:38:47 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
new file mode 100644
index 0000000..33c8024
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TimerException;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Tests for the timer service of {@code StreamTask}.
+ *
+ * <p>
+ * These tests ensure that exceptions are properly forwarded from the timer thread to
+ * the task thread and that operator methods are not invoked concurrently.
+ */
+@RunWith(Parameterized.class)
+public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
+
+	private final TimeCharacteristic timeCharacteristic;
+	
+	public StreamTaskTimerITCase(TimeCharacteristic characteristic) {
+		timeCharacteristic = characteristic;
+	}
+
+
+	/**
+	 * Note: this test fails if we don't check for exceptions in the source contexts and do not
+	 * synchronize in the source contexts.
+	 */
+	@Test
+	public void testOperatorChainedToSource() throws Exception {
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(timeCharacteristic);
+		env.setParallelism(1);
+
+		DataStream<String> source = env.addSource(new InfiniteTestSource());
+
+		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.ALWAYS));
+
+		boolean testSuccess = false;
+		try {
+			env.execute("Timer test");
+		} catch (JobExecutionException e) {
+			if (e.getCause() instanceof TimerException) {
+				TimerException te = (TimerException) e.getCause();
+				if (te.getCause() instanceof RuntimeException) {
+					RuntimeException re = (RuntimeException) te.getCause();
+					if (re.getMessage().equals("TEST SUCCESS")) {
+						testSuccess = true;
+					} else {
+						throw e;
+					}
+				} else {
+					throw e;
+				}
+			} else {
+				throw e;
+			}
+		}
+		Assert.assertTrue(testSuccess);
+	}
+
+	/**
+	 * Note: this test fails if we don't check for exceptions in the source contexts and do not
+	 * synchronize in the source contexts.
+	 */
+	@Test
+	public void testOneInputOperatorWithoutChaining() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(timeCharacteristic);
+		env.setParallelism(1);
+
+		DataStream<String> source = env.addSource(new InfiniteTestSource());
+
+		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.NEVER));
+
+		boolean testSuccess = false;
+		try {
+			env.execute("Timer test");
+		} catch (JobExecutionException e) {
+			if (e.getCause() instanceof TimerException) {
+				TimerException te = (TimerException) e.getCause();
+				if (te.getCause() instanceof RuntimeException) {
+					RuntimeException re = (RuntimeException) te.getCause();
+					if (re.getMessage().equals("TEST SUCCESS")) {
+						testSuccess = true;
+					} else {
+						throw e;
+					}
+				} else {
+					throw e;
+				}
+			} else {
+				throw e;
+			}
+		}
+		Assert.assertTrue(testSuccess);
+	}
+	
+	@Test
+	public void testTwoInputOperatorWithoutChaining() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(timeCharacteristic);
+		env.setParallelism(1);
+
+		DataStream<String> source = env.addSource(new InfiniteTestSource());
+
+		source.connect(source).transform(
+				"Custom Operator",
+				BasicTypeInfo.STRING_TYPE_INFO,
+				new TwoInputTimerOperator(ChainingStrategy.NEVER));
+
+		boolean testSuccess = false;
+		try {
+			env.execute("Timer test");
+		} catch (JobExecutionException e) {
+			if (e.getCause() instanceof TimerException) {
+				TimerException te = (TimerException) e.getCause();
+				if (te.getCause() instanceof RuntimeException) {
+					RuntimeException re = (RuntimeException) te.getCause();
+					if (re.getMessage().equals("TEST SUCCESS")) {
+						testSuccess = true;
+					} else {
+						throw e;
+					}
+				} else {
+					throw e;
+				}
+			} else {
+				throw e;
+			}
+		}
+		Assert.assertTrue(testSuccess);
+	}
+
+	public static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, Triggerable {
+		private static final long serialVersionUID = 1L;
+
+		int numTimers = 0;
+		int numElements = 0;
+
+		private boolean first = true;
+
+		private Semaphore semaphore = new Semaphore(1);
+
+		public TimerOperator(ChainingStrategy chainingStrategy) {
+			setChainingStrategy(chainingStrategy);
+		}
+
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			if (first) {
+				registerTimer(System.currentTimeMillis() + 100, this);
+				first = false;
+			}
+			numElements++;
+			
+			semaphore.release();
+		}
+
+		@Override
+		public void trigger(long time) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			try {
+				numTimers++;
+				throwIfDone();
+				registerTimer(System.currentTimeMillis() + 1, this);
+			} finally {
+				semaphore.release();
+			}
+		}
+
+		private void throwIfDone() {
+			if (numTimers > 1000 && numElements > 10_000) {
+				throw new RuntimeException("TEST SUCCESS");
+			}
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+			semaphore.release();
+		}
+	}
+
+	public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String>, Triggerable {
+		private static final long serialVersionUID = 1L;
+
+		int numTimers = 0;
+		int numElements = 0;
+
+		private boolean first = true;
+
+		private Semaphore semaphore = new Semaphore(1);
+
+		public TwoInputTimerOperator(ChainingStrategy chainingStrategy) {
+			setChainingStrategy(chainingStrategy);
+		}
+
+		@Override
+		public void processElement1(StreamRecord<String> element) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			if (first) {
+				registerTimer(System.currentTimeMillis() + 100, this);
+				first = false;
+			}
+			numElements++;
+
+			semaphore.release();
+		}
+
+		@Override
+		public void processElement2(StreamRecord<String> element) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			if (first) {
+				registerTimer(System.currentTimeMillis() + 100, this);
+				first = false;
+			}
+			numElements++;
+
+			semaphore.release();
+		}
+
+
+		@Override
+		public void trigger(long time) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			try {
+				numTimers++;
+				throwIfDone();
+				registerTimer(System.currentTimeMillis() + 1, this);
+			} finally {
+				semaphore.release();
+			}
+		}
+
+		private void throwIfDone() {
+			if (numTimers > 1000 && numElements > 10_000) {
+				throw new RuntimeException("TEST SUCCESS");
+			}
+		}
+
+		@Override
+		public void processWatermark1(Watermark mark) throws Exception {
+			//ignore
+		}
+
+		@Override
+		public void processWatermark2(Watermark mark) throws Exception {
+			//ignore
+		}
+	}
+
+
+	private static class InfiniteTestSource implements SourceFunction<String> {
+		private static final long serialVersionUID = 1L;
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			while (running) {
+				ctx.collect("hello");
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  parametrization
+	// ------------------------------------------------------------------------
+
+	@Parameterized.Parameters(name = "Time Characteristic = {0}")
+	public static Collection<Object[]> executionModes() {
+		return Arrays.asList(
+				new Object[] { TimeCharacteristic.ProcessingTime },
+				new Object[] { TimeCharacteristic.IngestionTime },
+				new Object[] { TimeCharacteristic.EventTime });
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
new file mode 100644
index 0000000..d69c140
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for timestamps, watermarks, and event-time sources.
+ */
+@SuppressWarnings("serial")
+public class TimestampITCase extends TestLogger {
+
+	private static final int NUM_TASK_MANAGERS = 2;
+	private static final int NUM_TASK_SLOTS = 3;
+	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+
+	// this is used in some tests to synchronize
+	static MultiShotLatch latch;
+
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@Before
+	public void setupLatch() {
+		// ensure that we get a fresh latch for each test
+		latch = new MultiShotLatch();
+	}
+
+
+	@BeforeClass
+	public static void startCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+
+			cluster = new ForkableFlinkMiniCluster(config, false);
+
+			cluster.start();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to start test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void shutdownCluster() {
+		try {
+			cluster.shutdown();
+			cluster = null;
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to stop test cluster: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * These check whether custom timestamp emission works at sources and also whether timestamps
+	 * arrive at operators throughout a topology.
+	 *
+	 * <p>
+	 * This also checks whether watermarks keep propagating if a source closes early.
+	 *
+	 * <p>
+	 * This only uses map to test the workings of watermarks in a complete, running topology. All
+	 * tasks and stream operators have dedicated tests that test the watermark propagation
+	 * behaviour.
+	 */
+	@Test
+	public void testWatermarkPropagation() throws Exception {
+		final int NUM_WATERMARKS = 10;
+
+		long initialTime = 0L;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
+		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS / 2));
+
+		source1.union(source2)
+				.map(new IdentityMap())
+				.connect(source2).map(new IdentityCoMap())
+				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
+				.addSink(new DiscardingSink<Integer>());
+
+		env.execute();
+
+		// verify that all the watermarks arrived at the final custom operator
+		for (int i = 0; i < PARALLELISM; i++) {
+			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
+			// other source stops emitting after that
+			for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
+				if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
+					System.err.println("All Watermarks: ");
+					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
+						System.err.println(CustomOperator.finalWatermarks[i].get(k));
+					}
+
+					fail("Wrong watermark.");
+				}
+			}
+			
+			assertEquals(Watermark.MAX_WATERMARK,
+					CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
+		}
+	}
+
+	@Test
+	public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
+		
+		// for this test to work, we need to be sure that no other jobs are being executed
+		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+			Thread.sleep(100);
+		}
+		
+		final int NUM_WATERMARKS = 10;
+
+		long initialTime = 0L;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Integer> source1 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS));
+		DataStream<Integer> source2 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS / 2));
+
+		source1.union(source2)
+				.map(new IdentityMap())
+				.connect(source2).map(new IdentityCoMap())
+				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
+				.addSink(new DiscardingSink<Integer>());
+
+		new Thread("stopper") {
+			@Override
+			public void run() {
+				try {
+					// try until we get the running jobs
+					List<JobID> running;
+					while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
+						Thread.sleep(50);
+					}
+
+					JobID id = running.get(0);
+					
+					// send stop until the job is stopped
+					do {
+						cluster.stopJob(id);
+						Thread.sleep(50);
+					} while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+		}.start();
+		
+		env.execute();
+
+		// verify that all the watermarks arrived at the final custom operator
+		for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) {
+			
+			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
+			// other source stops emitting after that
+			for (int j = 0; j < subtaskWatermarks.size(); j++) {
+				if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) {
+					System.err.println("All Watermarks: ");
+					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
+						System.err.println(subtaskWatermarks.get(k));
+					}
+
+					fail("Wrong watermark.");
+				}
+			}
+			
+			// if there are watermarks, the final one must not be the MAX watermark
+			if (subtaskWatermarks.size() > 0) {
+				assertNotEquals(Watermark.MAX_WATERMARK,
+						subtaskWatermarks.get(subtaskWatermarks.size()-1));
+			}
+		}
+	}
+
+	/**
+	 * These check whether timestamps are properly assigned at the sources and handled in
+	 * network transmission and between chained operators when timestamps are enabled.
+	 */
+	@Test
+	public void testTimestampHandling() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
+		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
+
+		source1
+				.map(new IdentityMap())
+				.connect(source2).map(new IdentityCoMap())
+				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
+				.addSink(new DiscardingSink<Integer>());
+
+
+		env.execute();
+	}
+
+	/**
+	 * These check whether timestamps are properly ignored when they are disabled.
+	 */
+	@Test
+	public void testDisabledTimestamps() throws Exception {
+		final int NUM_ELEMENTS = 10;
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+		
+		DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
+		DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
+
+		source1
+				.map(new IdentityMap())
+				.connect(source2).map(new IdentityCoMap())
+				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
+				.addSink(new DiscardingSink<Integer>());
+		
+		env.execute();
+	}
+
+	/**
+	 * This tests whether timestamps are properly extracted in the timestamp
+	 * extractor and whether watermarks are also correctly forwared from this with the auto watermark
+	 * interval.
+	 */
+	@Test
+	public void testTimestampExtractorWithAutoInterval() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.getConfig().setAutoWatermarkInterval(10);
+		env.setParallelism(1);
+		env.getConfig().disableSysoutLogging();
+
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 1;
+				while (index <= NUM_ELEMENTS) {
+					ctx.collect(index);
+					latch.await();
+					index++;
+				}
+			}
+
+			@Override
+			public void cancel() {}
+		});
+
+		DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks(
+				new AscendingTimestampExtractor<Integer>() {
+					@Override
+					public long extractAscendingTimestamp(Integer element) {
+						return element;
+					}
+				});
+
+		extractOp
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
+				.transform("Timestamp Check",
+						BasicTypeInfo.INT_TYPE_INFO,
+						new TimestampCheckingOperator());
+
+		// verify that extractor picks up source parallelism
+		Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());
+
+		env.execute();
+
+		// verify that we get NUM_ELEMENTS watermarks
+		for (int j = 0; j < NUM_ELEMENTS; j++) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
+				long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
+				Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
+			}
+		}
+		
+		// the input is finite, so it should have a MAX Watermark
+		assertEquals(Watermark.MAX_WATERMARK, 
+				CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
+	}
+
+	/**
+	 * This thests whether timestamps are properly extracted in the timestamp
+	 * extractor and whether watermark are correctly forwarded from the custom watermark emit
+	 * function.
+	 */
+	@Test
+	public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.getConfig().setAutoWatermarkInterval(10);
+		env.setParallelism(1);
+		env.getConfig().disableSysoutLogging();
+
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 1;
+				while (index <= NUM_ELEMENTS) {
+					ctx.collect(index);
+					latch.await();
+					index++;
+				}
+			}
+
+			@Override
+			public void cancel() {}
+		});
+
+		source1
+				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() {
+					
+					@Override
+					public long extractTimestamp(Integer element, long currentTimestamp) {
+						return element;
+					}
+
+					@Override
+					public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
+						return new Watermark(extractedTimestamp - 1);
+					}
+				})
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
+				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
+
+
+		env.execute();
+
+		// verify that we get NUM_ELEMENTS watermarks
+		for (int j = 0; j < NUM_ELEMENTS; j++) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
+				Assert.fail("Wrong watermark.");
+			}
+		}
+
+		// the input is finite, so it should have a MAX Watermark
+		assertEquals(Watermark.MAX_WATERMARK,
+				CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
+	}
+
+	/**
+	 * This test verifies that the timestamp extractor does not emit decreasing watermarks even
+	 *
+	 */
+	@Test
+	public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.getConfig().setAutoWatermarkInterval(1);
+		env.setParallelism(1);
+		env.getConfig().disableSysoutLogging();
+
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 1;
+				while (index <= NUM_ELEMENTS) {
+					ctx.collect(index);
+					Thread.sleep(100);
+					ctx.collect(index - 1);
+					latch.await();
+					index++;
+				}
+			}
+
+			@Override
+			public void cancel() {}
+		});
+
+		source1
+				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() {
+
+					@Override
+					public long extractTimestamp(Integer element, long previousTimestamp) {
+						return element;
+					}
+
+					@Override
+					public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
+						return new Watermark(extractedTimestamp - 1);
+					}
+				})
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
+				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
+
+
+		env.execute();
+
+		// verify that we get NUM_ELEMENTS watermarks
+		for (int j = 0; j < NUM_ELEMENTS; j++) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
+				Assert.fail("Wrong watermark.");
+			}
+		}
+		// the input is finite, so it should have a MAX Watermark
+		assertEquals(Watermark.MAX_WATERMARK,
+				CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
+	}
+
+	/**
+	 * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks.
+	 */
+	@Test
+	public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.getConfig().setAutoWatermarkInterval(1);
+		env.setParallelism(2);
+		env.getConfig().disableSysoutLogging();
+
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 1;
+				while (index <= NUM_ELEMENTS) {
+					ctx.collectWithTimestamp(index, index);
+					ctx.collectWithTimestamp(index - 1, index - 1);
+					index++;
+					ctx.emitWatermark(new Watermark(index-2));
+				}
+
+				// emit the final Long.MAX_VALUE watermark, do it twice and verify that
+				// we only see one in the result
+				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+			}
+
+			@Override
+			public void cancel() {}
+		});
+
+		source1
+				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() {
+
+					@Override
+					public long extractTimestamp(Integer element, long currentTimestamp) {
+						return element;
+					}
+
+					@Override
+					public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
+						return null;
+					}
+				})
+			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
+
+
+		env.execute();
+
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE);
+	}
+
+	/**
+	 * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks.
+	 * 
+	 * Same test as before, but using a different timestamp extractor
+	 */
+	@Test
+	public void testTimestampExtractorWithLongMaxWatermarkFromSource2() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.getConfig().setAutoWatermarkInterval(10);
+		env.setParallelism(2);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 1;
+				while (index <= NUM_ELEMENTS) {
+					ctx.collectWithTimestamp(index, index);
+					ctx.collectWithTimestamp(index - 1, index - 1);
+					index++;
+					ctx.emitWatermark(new Watermark(index-2));
+				}
+
+				// emit the final Long.MAX_VALUE watermark, do it twice and verify that
+				// we only see one in the result
+				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+			}
+
+			@Override
+			public void cancel() {}
+		});
+
+		source1
+				.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Integer>() {
+
+					@Override
+					public long extractTimestamp(Integer element, long currentTimestamp) {
+						return element;
+					}
+
+					@Override
+					public Watermark getCurrentWatermark() {
+						return null;
+					}
+				})
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
+		
+		env.execute();
+
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE);
+	}
+
+	/**
+	 * This verifies that an event time source works when setting stream time characteristic to
+	 * processing time. In this case, the watermarks should just be swallowed.
+	 */
+	@Test
+	public void testEventTimeSourceWithProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = 
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		
+		env.setParallelism(2);
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 10));
+
+		source1
+			.map(new IdentityMap())
+			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false));
+
+		env.execute();
+
+		// verify that we don't get any watermarks, the source is used as watermark source in
+		// other tests, so it normally emits watermarks
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 0);
+	}
+	
+	@Test
+	public void testErrorOnEventTimeOverProcessingTime() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+
+		env.setParallelism(2);
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source1 = 
+				env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
+
+		source1
+				.keyBy(0)
+				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
+				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
+						return value1;
+					}
+				})
+				.print();
+
+		try {
+			env.execute();
+			fail("this should fail with an exception");
+		} catch (Exception e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testErrorOnEventTimeWithoutTimestamps() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+
+		env.setParallelism(2);
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Tuple2<String, Integer>> source1 =
+				env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
+
+		source1
+				.keyBy(0)
+				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
+				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
+						return value1;
+					}
+				})
+				.print();
+
+		try {
+			env.execute();
+			fail("this should fail with an exception");
+		} catch (Exception e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Custom Operators and Functions
+	// ------------------------------------------------------------------------
+	
+	@SuppressWarnings("unchecked")
+	public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
+
+		List<Watermark> watermarks;
+		public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
+		private final boolean timestampsEnabled;
+
+		public CustomOperator(boolean timestampsEnabled) {
+			setChainingStrategy(ChainingStrategy.ALWAYS);
+			this.timestampsEnabled = timestampsEnabled;
+		}
+
+		@Override
+		public void processElement(StreamRecord<Integer> element) throws Exception {
+			if (timestampsEnabled) {
+				if (element.getTimestamp() != element.getValue()) {
+					Assert.fail("Timestamps are not properly handled.");
+				}
+			}
+			output.collect(element);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			for (Watermark previousMark: watermarks) {
+				assertTrue(previousMark.getTimestamp() < mark.getTimestamp());
+			}
+			watermarks.add(mark);
+			latch.trigger();
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+			watermarks = new ArrayList<>();
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks;
+		}
+	}
+
+	public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
+
+		public TimestampCheckingOperator() {
+			setChainingStrategy(ChainingStrategy.ALWAYS);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Integer> element) throws Exception {
+			if (element.getTimestamp() != element.getValue()) {
+				Assert.fail("Timestamps are not properly handled.");
+			}
+			output.collect(element);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {}
+	}
+
+	public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
+
+		@Override
+		public void processElement(StreamRecord<Integer> element) throws Exception {
+			if (element.hasTimestamp()) {
+				Assert.fail("Timestamps are not properly handled.");
+			}
+			output.collect(element);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {}
+	}
+
+	public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {
+		@Override
+		public Integer map1(Integer value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public Integer map2(Integer value) throws Exception {
+			return value;
+		}
+	}
+
+	public static class IdentityMap implements MapFunction<Integer, Integer> {
+		@Override
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+	}
+
+	public static class MyTimestampSource implements SourceFunction<Integer> {
+
+		private final long initialTime;
+		private final int numWatermarks;
+
+		public MyTimestampSource(long initialTime, int numWatermarks) {
+			this.initialTime = initialTime;
+			this.numWatermarks = numWatermarks;
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			for (int i = 0; i < numWatermarks; i++) {
+				ctx.collectWithTimestamp(i, initialTime + i);
+				ctx.emitWatermark(new Watermark(initialTime + i));
+			}
+		}
+
+		@Override
+		public void cancel() {}
+	}
+
+	public static class MyTimestampSourceInfinite implements SourceFunction<Integer>, StoppableFunction {
+
+		private final long initialTime;
+		private final int numWatermarks;
+
+		private volatile boolean running = true;
+		
+		public MyTimestampSourceInfinite(long initialTime, int numWatermarks) {
+			this.initialTime = initialTime;
+			this.numWatermarks = numWatermarks;
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			for (int i = 0; i < numWatermarks; i++) {
+				ctx.collectWithTimestamp(i, initialTime + i);
+				ctx.emitWatermark(new Watermark(initialTime + i));
+			}
+			
+			while (running) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+
+		@Override
+		public void stop() {
+			running = false;
+		}
+	}
+
+	public static class MyNonWatermarkingSource implements SourceFunction<Integer> {
+
+		int numWatermarks;
+
+		public MyNonWatermarkingSource(int numWatermarks) {
+			this.numWatermarks = numWatermarks;
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			for (int i = 0; i < numWatermarks; i++) {
+				ctx.collect(i);
+			}
+		}
+
+		@Override
+		public void cancel() {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
new file mode 100644
index 0000000..1e3e3d5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
+ * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
+ */
+@SuppressWarnings("serial")
+public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
+
+	private static List<String> testResults;
+
+	@Test
+	public void testFoldWindow() throws Exception {
+
+		testResults = new ArrayList<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+				ctx.collect(Tuple2.of("a", 2));
+
+				ctx.collect(Tuple2.of("b", 3));
+				ctx.collect(Tuple2.of("b", 4));
+				ctx.collect(Tuple2.of("b", 5));
+
+				ctx.collect(Tuple2.of("a", 6));
+				ctx.collect(Tuple2.of("a", 7));
+				ctx.collect(Tuple2.of("a", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {}
+			
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+		source1
+				.keyBy(0)
+				.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+					@Override
+					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,
+							Tuple2<String, Integer> value) throws Exception {
+						accumulator.f0 += value.f0;
+						accumulator.f1 += value.f1;
+						return accumulator;
+					}
+				})
+				.addSink(new SinkFunction<Tuple2<String, Integer>>() {
+					@Override
+					public void invoke(Tuple2<String, Integer> value) throws Exception {
+						testResults.add(value.toString());
+					}
+				});
+
+		env.execute("Fold Window Test");
+
+		List<String> expectedResult = Arrays.asList(
+				"(R:aaa,3)",
+				"(R:aaa,21)",
+				"(R:bbb,12)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	@Test
+	public void testFoldAllWindow() throws Exception {
+
+		testResults = new ArrayList<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+				ctx.collect(Tuple2.of("a", 2));
+
+				ctx.collect(Tuple2.of("b", 3));
+				ctx.collect(Tuple2.of("a", 3));
+				ctx.collect(Tuple2.of("b", 4));
+				ctx.collect(Tuple2.of("a", 4));
+				ctx.collect(Tuple2.of("b", 5));
+				ctx.collect(Tuple2.of("a", 5));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+		source1
+				.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+					@Override
+					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,
+							Tuple2<String, Integer> value) throws Exception {
+						accumulator.f0 += value.f0;
+						accumulator.f1 += value.f1;
+						return accumulator;
+					}
+				})
+				.addSink(new SinkFunction<Tuple2<String, Integer>>() {
+					@Override
+					public void invoke(Tuple2<String, Integer> value) throws Exception {
+						testResults.add(value.toString());
+					}
+				});
+
+		env.execute("Fold All-Window Test");
+
+		List<String> expectedResult = Arrays.asList(
+				"(R:aaa,3)",
+				"(R:bababa,24)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
+
+		@Override
+		public long extractTimestamp(Tuple2<String, Integer> element, long previousTimestamp) {
+			return element.f1;
+		}
+
+		@Override
+		public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> lastElement, long extractedTimestamp) {
+			return new Watermark(lastElement.f1 - 1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
new file mode 100644
index 0000000..8fc8372
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.streaming.runtime.util;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
+import java.util.Collections;
+
+public class EvenOddOutputSelector implements OutputSelector<Integer> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public Iterable<String> select(Integer value) {
+		return value % 2 == 0 ? Collections.singleton("even") : Collections.singleton("odd");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java
new file mode 100644
index 0000000..6667446
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.streaming.runtime.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+public class NoOpIntMap implements MapFunction<Integer, Integer> {
+	private static final long serialVersionUID = 1L;
+
+	public Integer map(Integer value) throws Exception {
+		return value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java
new file mode 100644
index 0000000..21d5294
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.streaming.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
+	private List<T> received;
+
+	public void invoke(T tuple) {
+		received.add(tuple);
+	}
+
+	public void open(Configuration conf) {
+		received = new ArrayList<T>();
+	}
+
+	public void close() {
+		assertTrue(received.size() > 0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
new file mode 100644
index 0000000..321d4c5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+
+public class TestListResultSink<T> extends RichSinkFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+	private int resultListId;
+
+	public TestListResultSink() {
+		this.resultListId = TestListWrapper.getInstance().createList();
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+		synchronized (resultList()) {
+			resultList().add(value);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+	}
+
+	@SuppressWarnings("unchecked")
+	private List<T> resultList() {
+		synchronized (TestListWrapper.getInstance()) {
+			return (List<T>) TestListWrapper.getInstance().getList(resultListId);
+		}
+	}
+
+	public List<T> getResult() {
+		synchronized (resultList()) {
+			ArrayList<T> copiedList = new ArrayList<T>(resultList());
+			return copiedList;
+		}
+	}
+
+	public List<T> getSortedResult() {
+		synchronized (resultList()) {
+			TreeSet<T> treeSet = new TreeSet<T>(resultList());
+			ArrayList<T> sortedList = new ArrayList<T>(treeSet);
+			return sortedList;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java
new file mode 100644
index 0000000..19ca8eb
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestListWrapper {
+
+	private static TestListWrapper instance;
+
+	@SuppressWarnings("rawtypes")
+	private List<List<? extends Comparable>> lists;
+
+	@SuppressWarnings("rawtypes")
+	private TestListWrapper() {
+		lists = Collections.synchronizedList(new ArrayList<List<? extends Comparable>>());
+	}
+
+	public static TestListWrapper getInstance() {
+		if (instance == null) {
+			instance = new TestListWrapper();
+		}
+		return instance;
+	}
+
+	/**
+	 * Creates and stores a list, returns with the id.
+	 *
+	 * @return The ID of the list.
+	 */
+	@SuppressWarnings("rawtypes")
+	public int createList() {
+		lists.add(new ArrayList<Comparable>());
+		return lists.size() - 1;
+	}
+
+	public List<?> getList(int listId) {
+		@SuppressWarnings("rawtypes")
+		List<? extends Comparable> list = lists.get(listId);
+		if (list == null) {
+			throw new RuntimeException("No such list.");
+		}
+
+		return list;
+	}
+
+}
\ No newline at end of file


Mime
View raw message