flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [12/15] flink git commit: [FLINK-3995] [build] flink-test-utils also contains the streaming test utilities.
Date Tue, 05 Jul 2016 14:38:49 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
deleted file mode 100644
index 6c1d0e6..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.timestamp;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for timestamps, watermarks, and event-time sources.
- */
-@SuppressWarnings("serial")
-public class TimestampITCase extends TestLogger {
-
-	private static final int NUM_TASK_MANAGERS = 2;
-	private static final int NUM_TASK_SLOTS = 3;
-	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
-	// this is used in some tests to synchronize
-	static MultiShotLatch latch;
-
-
-	private static ForkableFlinkMiniCluster cluster;
-
-	@Before
-	public void setupLatch() {
-		// ensure that we get a fresh latch for each test
-		latch = new MultiShotLatch();
-	}
-
-
-	@BeforeClass
-	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
-			cluster = new ForkableFlinkMiniCluster(config, false);
-
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * These check whether custom timestamp emission works at sources and also whether timestamps
-	 * arrive at operators throughout a topology.
-	 *
-	 * <p>
-	 * This also checks whether watermarks keep propagating if a source closes early.
-	 *
-	 * <p>
-	 * This only uses map to test the workings of watermarks in a complete, running topology. All
-	 * tasks and stream operators have dedicated tests that test the watermark propagation
-	 * behaviour.
-	 */
-	@Test
-	public void testWatermarkPropagation() throws Exception {
-		final int NUM_WATERMARKS = 10;
-
-		long initialTime = 0L;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS / 2));
-
-		source1.union(source2)
-				.map(new IdentityMap())
-				.connect(source2).map(new IdentityCoMap())
-				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-				.addSink(new NoOpSink<Integer>());
-
-		env.execute();
-
-		// verify that all the watermarks arrived at the final custom operator
-		for (int i = 0; i < PARALLELISM; i++) {
-			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
-			// other source stops emitting after that
-			for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
-				if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
-					System.err.println("All Watermarks: ");
-					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
-						System.err.println(CustomOperator.finalWatermarks[i].get(k));
-					}
-
-					fail("Wrong watermark.");
-				}
-			}
-			
-			assertEquals(Watermark.MAX_WATERMARK,
-					CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
-		}
-	}
-
-	@Test
-	public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
-		
-		// for this test to work, we need to be sure that no other jobs are being executed
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			Thread.sleep(100);
-		}
-		
-		final int NUM_WATERMARKS = 10;
-
-		long initialTime = 0L;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS / 2));
-
-		source1.union(source2)
-				.map(new IdentityMap())
-				.connect(source2).map(new IdentityCoMap())
-				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-				.addSink(new NoOpSink<Integer>());
-
-		new Thread("stopper") {
-			@Override
-			public void run() {
-				try {
-					// try until we get the running jobs
-					List<JobID> running;
-					while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
-						Thread.sleep(50);
-					}
-
-					JobID id = running.get(0);
-					
-					// send stop until the job is stopped
-					do {
-						cluster.stopJob(id);
-						Thread.sleep(50);
-					} while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
-				}
-				catch (Throwable t) {
-					t.printStackTrace();
-				}
-			}
-		}.start();
-		
-		env.execute();
-
-		// verify that all the watermarks arrived at the final custom operator
-		for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) {
-			
-			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
-			// other source stops emitting after that
-			for (int j = 0; j < subtaskWatermarks.size(); j++) {
-				if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) {
-					System.err.println("All Watermarks: ");
-					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
-						System.err.println(subtaskWatermarks.get(k));
-					}
-
-					fail("Wrong watermark.");
-				}
-			}
-			
-			// if there are watermarks, the final one must not be the MAX watermark
-			if (subtaskWatermarks.size() > 0) {
-				assertNotEquals(Watermark.MAX_WATERMARK,
-						subtaskWatermarks.get(subtaskWatermarks.size()-1));
-			}
-		}
-	}
-
-	/**
-	 * These check whether timestamps are properly assigned at the sources and handled in
-	 * network transmission and between chained operators when timestamps are enabled.
-	 */
-	@Test
-	public void testTimestampHandling() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
-
-		source1
-				.map(new IdentityMap())
-				.connect(source2).map(new IdentityCoMap())
-				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
-				.addSink(new NoOpSink<Integer>());
-
-
-		env.execute();
-	}
-
-	/**
-	 * These check whether timestamps are properly ignored when they are disabled.
-	 */
-	@Test
-	public void testDisabledTimestamps() throws Exception {
-		final int NUM_ELEMENTS = 10;
-		
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-		
-		DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
-		DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
-
-		source1
-				.map(new IdentityMap())
-				.connect(source2).map(new IdentityCoMap())
-				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
-				.addSink(new NoOpSink<Integer>());
-		
-		env.execute();
-	}
-
-	/**
-	 * This tests whether timestamps are properly extracted in the timestamp
-	 * extractor and whether watermarks are also correctly forwared from this with the auto watermark
-	 * interval.
-	 */
-	@Test
-	public void testTimestampExtractorWithAutoInterval() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.getConfig().setAutoWatermarkInterval(10);
-		env.setParallelism(1);
-		env.getConfig().disableSysoutLogging();
-
-
-		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 1;
-				while (index <= NUM_ELEMENTS) {
-					ctx.collect(index);
-					latch.await();
-					index++;
-				}
-			}
-
-			@Override
-			public void cancel() {}
-		});
-
-		DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks(
-				new AscendingTimestampExtractor<Integer>() {
-					@Override
-					public long extractAscendingTimestamp(Integer element) {
-						return element;
-					}
-				});
-
-		extractOp
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-				.transform("Timestamp Check",
-						BasicTypeInfo.INT_TYPE_INFO,
-						new TimestampCheckingOperator());
-
-		// verify that extractor picks up source parallelism
-		Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());
-
-		env.execute();
-
-		// verify that we get NUM_ELEMENTS watermarks
-		for (int j = 0; j < NUM_ELEMENTS; j++) {
-			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
-				long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
-				Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
-			}
-		}
-		
-		// the input is finite, so it should have a MAX Watermark
-		assertEquals(Watermark.MAX_WATERMARK, 
-				CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
-	}
-
-	/**
-	 * This thests whether timestamps are properly extracted in the timestamp
-	 * extractor and whether watermark are correctly forwarded from the custom watermark emit
-	 * function.
-	 */
-	@Test
-	public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.getConfig().setAutoWatermarkInterval(10);
-		env.setParallelism(1);
-		env.getConfig().disableSysoutLogging();
-
-
-		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 1;
-				while (index <= NUM_ELEMENTS) {
-					ctx.collect(index);
-					latch.await();
-					index++;
-				}
-			}
-
-			@Override
-			public void cancel() {}
-		});
-
-		source1
-				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() {
-					
-					@Override
-					public long extractTimestamp(Integer element, long currentTimestamp) {
-						return element;
-					}
-
-					@Override
-					public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
-						return new Watermark(extractedTimestamp - 1);
-					}
-				})
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
-
-
-		env.execute();
-
-		// verify that we get NUM_ELEMENTS watermarks
-		for (int j = 0; j < NUM_ELEMENTS; j++) {
-			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
-				Assert.fail("Wrong watermark.");
-			}
-		}
-
-		// the input is finite, so it should have a MAX Watermark
-		assertEquals(Watermark.MAX_WATERMARK,
-				CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
-	}
-
-	/**
-	 * This test verifies that the timestamp extractor does not emit decreasing watermarks even
-	 *
-	 */
-	@Test
-	public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.getConfig().setAutoWatermarkInterval(1);
-		env.setParallelism(1);
-		env.getConfig().disableSysoutLogging();
-
-
-		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 1;
-				while (index <= NUM_ELEMENTS) {
-					ctx.collect(index);
-					Thread.sleep(100);
-					ctx.collect(index - 1);
-					latch.await();
-					index++;
-				}
-			}
-
-			@Override
-			public void cancel() {}
-		});
-
-		source1
-				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() {
-
-					@Override
-					public long extractTimestamp(Integer element, long previousTimestamp) {
-						return element;
-					}
-
-					@Override
-					public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
-						return new Watermark(extractedTimestamp - 1);
-					}
-				})
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
-
-
-		env.execute();
-
-		// verify that we get NUM_ELEMENTS watermarks
-		for (int j = 0; j < NUM_ELEMENTS; j++) {
-			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
-				Assert.fail("Wrong watermark.");
-			}
-		}
-		// the input is finite, so it should have a MAX Watermark
-		assertEquals(Watermark.MAX_WATERMARK,
-				CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
-	}
-
-	/**
-	 * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks.
-	 */
-	@Test
-	public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.getConfig().setAutoWatermarkInterval(1);
-		env.setParallelism(2);
-		env.getConfig().disableSysoutLogging();
-
-
-		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 1;
-				while (index <= NUM_ELEMENTS) {
-					ctx.collectWithTimestamp(index, index);
-					ctx.collectWithTimestamp(index - 1, index - 1);
-					index++;
-					ctx.emitWatermark(new Watermark(index-2));
-				}
-
-				// emit the final Long.MAX_VALUE watermark, do it twice and verify that
-				// we only see one in the result
-				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-			}
-
-			@Override
-			public void cancel() {}
-		});
-
-		source1
-				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() {
-
-					@Override
-					public long extractTimestamp(Integer element, long currentTimestamp) {
-						return element;
-					}
-
-					@Override
-					public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
-						return null;
-					}
-				})
-			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
-
-
-		env.execute();
-
-		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
-		Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE);
-	}
-
-	/**
-	 * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks.
-	 * 
-	 * Same test as before, but using a different timestamp extractor
-	 */
-	@Test
-	public void testTimestampExtractorWithLongMaxWatermarkFromSource2() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.getConfig().setAutoWatermarkInterval(10);
-		env.setParallelism(2);
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 1;
-				while (index <= NUM_ELEMENTS) {
-					ctx.collectWithTimestamp(index, index);
-					ctx.collectWithTimestamp(index - 1, index - 1);
-					index++;
-					ctx.emitWatermark(new Watermark(index-2));
-				}
-
-				// emit the final Long.MAX_VALUE watermark, do it twice and verify that
-				// we only see one in the result
-				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-				ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-			}
-
-			@Override
-			public void cancel() {}
-		});
-
-		source1
-				.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Integer>() {
-
-					@Override
-					public long extractTimestamp(Integer element, long currentTimestamp) {
-						return element;
-					}
-
-					@Override
-					public Watermark getCurrentWatermark() {
-						return null;
-					}
-				})
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
-		
-		env.execute();
-
-		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
-		Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE);
-	}
-
-	/**
-	 * This verifies that an event time source works when setting stream time characteristic to
-	 * processing time. In this case, the watermarks should just be swallowed.
-	 */
-	@Test
-	public void testEventTimeSourceWithProcessingTime() throws Exception {
-		StreamExecutionEnvironment env = 
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-		
-		env.setParallelism(2);
-		env.getConfig().disableSysoutLogging();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 10));
-
-		source1
-			.map(new IdentityMap())
-			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false));
-
-		env.execute();
-
-		// verify that we don't get any watermarks, the source is used as watermark source in
-		// other tests, so it normally emits watermarks
-		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 0);
-	}
-	
-	@Test
-	public void testErrorOnEventTimeOverProcessingTime() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-
-		env.setParallelism(2);
-		env.getConfig().disableSysoutLogging();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		DataStream<Tuple2<String, Integer>> source1 = 
-				env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
-
-		source1
-				.keyBy(0)
-				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
-				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-					@Override
-					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
-						return value1;
-					}
-				})
-				.print();
-
-		try {
-			env.execute();
-			fail("this should fail with an exception");
-		} catch (Exception e) {
-			// expected
-		}
-	}
-
-	@Test
-	public void testErrorOnEventTimeWithoutTimestamps() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-
-		env.setParallelism(2);
-		env.getConfig().disableSysoutLogging();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-		DataStream<Tuple2<String, Integer>> source1 =
-				env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
-
-		source1
-				.keyBy(0)
-				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
-				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-					@Override
-					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
-						return value1;
-					}
-				})
-				.print();
-
-		try {
-			env.execute();
-			fail("this should fail with an exception");
-		} catch (Exception e) {
-			// expected
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Custom Operators and Functions
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("unchecked")
-	public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
-		List<Watermark> watermarks;
-		public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
-		private final boolean timestampsEnabled;
-
-		public CustomOperator(boolean timestampsEnabled) {
-			setChainingStrategy(ChainingStrategy.ALWAYS);
-			this.timestampsEnabled = timestampsEnabled;
-		}
-
-		@Override
-		public void processElement(StreamRecord<Integer> element) throws Exception {
-			if (timestampsEnabled) {
-				if (element.getTimestamp() != element.getValue()) {
-					Assert.fail("Timestamps are not properly handled.");
-				}
-			}
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			for (Watermark previousMark: watermarks) {
-				assertTrue(previousMark.getTimestamp() < mark.getTimestamp());
-			}
-			watermarks.add(mark);
-			latch.trigger();
-			output.emitWatermark(mark);
-		}
-
-		@Override
-		public void open() throws Exception {
-			super.open();
-			watermarks = new ArrayList<>();
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks;
-		}
-	}
-
-	public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
-		public TimestampCheckingOperator() {
-			setChainingStrategy(ChainingStrategy.ALWAYS);
-		}
-
-		@Override
-		public void processElement(StreamRecord<Integer> element) throws Exception {
-			if (element.getTimestamp() != element.getValue()) {
-				Assert.fail("Timestamps are not properly handled.");
-			}
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {}
-	}
-
-	public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
-		@Override
-		public void processElement(StreamRecord<Integer> element) throws Exception {
-			if (element.hasTimestamp()) {
-				Assert.fail("Timestamps are not properly handled.");
-			}
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {}
-	}
-
-	public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {
-		@Override
-		public Integer map1(Integer value) throws Exception {
-			return value;
-		}
-
-		@Override
-		public Integer map2(Integer value) throws Exception {
-			return value;
-		}
-	}
-
-	public static class IdentityMap implements MapFunction<Integer, Integer> {
-		@Override
-		public Integer map(Integer value) throws Exception {
-			return value;
-		}
-	}
-
-	public static class MyTimestampSource implements SourceFunction<Integer> {
-
-		private final long initialTime;
-		private final int numWatermarks;
-
-		public MyTimestampSource(long initialTime, int numWatermarks) {
-			this.initialTime = initialTime;
-			this.numWatermarks = numWatermarks;
-		}
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			for (int i = 0; i < numWatermarks; i++) {
-				ctx.collectWithTimestamp(i, initialTime + i);
-				ctx.emitWatermark(new Watermark(initialTime + i));
-			}
-		}
-
-		@Override
-		public void cancel() {}
-	}
-
-	public static class MyTimestampSourceInfinite implements SourceFunction<Integer>, StoppableFunction {
-
-		private final long initialTime;
-		private final int numWatermarks;
-
-		private volatile boolean running = true;
-		
-		public MyTimestampSourceInfinite(long initialTime, int numWatermarks) {
-			this.initialTime = initialTime;
-			this.numWatermarks = numWatermarks;
-		}
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			for (int i = 0; i < numWatermarks; i++) {
-				ctx.collectWithTimestamp(i, initialTime + i);
-				ctx.emitWatermark(new Watermark(initialTime + i));
-			}
-			
-			while (running) {
-				Thread.sleep(20);
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-
-		@Override
-		public void stop() {
-			running = false;
-		}
-	}
-
-	public static class MyNonWatermarkingSource implements SourceFunction<Integer> {
-
-		int numWatermarks;
-
-		public MyNonWatermarkingSource(int numWatermarks) {
-			this.numWatermarks = numWatermarks;
-		}
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			for (int i = 0; i < numWatermarks; i++) {
-				ctx.collect(i);
-			}
-		}
-
-		@Override
-		public void cancel() {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
deleted file mode 100644
index d398121..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-public final class NoOpSink<T> extends RichSinkFunction<T> {
-	public void invoke(T tuple) {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
deleted file mode 100644
index a46ff55..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
-	private List<T> received;
-
-	public void invoke(T tuple) {
-		received.add(tuple);
-	}
-
-	public void open(Configuration conf) {
-		received = new ArrayList<T>();
-	}
-
-	public void close() {
-		assertTrue(received.size() > 0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
deleted file mode 100644
index 7d6a6d0..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Test base for streaming programs relying on an open server socket to write to.
- */
-public abstract class SocketOutputTestBase extends StreamingProgramTestBase {
-
-	protected static final String HOST = "localhost";
-	protected static Integer port;
-	protected Set<String> dataReadFromSocket = new HashSet<String>();
-
-	@Override
-	protected void preSubmit() throws Exception {
-		port = NetUtils.getAvailablePort();
-		temporarySocket = createLocalSocket(port);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		Set<String> expectedData = new HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n")));
-		Assert.assertEquals(expectedData, dataReadFromSocket);
-		temporarySocket.close();
-	}
-
-	protected ServerSocket temporarySocket;
-
-	public ServerSocket createLocalSocket(int port) throws Exception {
-		ServerSocket serverSocket = new ServerSocket(port);
-		ServerThread st = new ServerThread(serverSocket);
-		st.start();
-		return serverSocket;
-	}
-
-	protected class ServerThread extends Thread {
-
-		private ServerSocket serverSocket;
-		private Thread t;
-
-		public ServerThread(ServerSocket serverSocket) {
-			this.serverSocket = serverSocket;
-			t = new Thread(this);
-		}
-
-		public void waitForAccept() throws Exception {
-			Socket socket = serverSocket.accept();
-			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-			DeserializationSchema<String> schema = new SimpleStringSchema();
-			String rawData = in.readLine();
-			while (rawData != null){
-				String string = schema.deserialize(rawData.getBytes());
-				dataReadFromSocket.add(string);
-				rawData = in.readLine();
-			}
-			socket.close();
-		}
-
-		public void run() {
-			try {
-				waitForAccept();
-			} catch (Exception e) {
-				Assert.fail();
-				throw new RuntimeException(e);
-			}
-		}
-
-		@Override
-		public void start() {
-			t.start();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
deleted file mode 100644
index 8cdedd5..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * Base class for streaming unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the StreamExecutionEnvironment from
- * the context:
- *
- * <pre>
- *   {@literal @}Test
- *   public void someTest() {
- *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- *   {@literal @}Test
- *   public void anotherTest() {
- *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- * </pre>
- */
-public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
-
-	// ------------------------------------------------------------------------
-	//  The mini cluster that is shared across tests
-	// ------------------------------------------------------------------------
-
-	protected static final int DEFAULT_PARALLELISM = 4;
-
-	protected static ForkableFlinkMiniCluster cluster;
-
-	public StreamingMultipleProgramsTestBase() {
-		super(new Configuration());
-	}
-
-	// ------------------------------------------------------------------------
-	//  Cluster setup & teardown
-	// ------------------------------------------------------------------------
-
-	@BeforeClass
-	public static void setup() throws Exception {
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true);
-		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		TestStreamEnvironment.unsetAsContext();
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
deleted file mode 100644
index 50ed1cf..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public abstract class StreamingProgramTestBase extends AbstractTestBase {
-
-	protected static final int DEFAULT_PARALLELISM = 4;
-
-	private int parallelism;
-	
-	
-	public StreamingProgramTestBase() {
-		super(new Configuration());
-		setParallelism(DEFAULT_PARALLELISM);
-	}
-
-
-	public void setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-		setTaskManagerNumSlots(parallelism);
-	}
-	
-	public int getParallelism() {
-		return parallelism;
-	}
-	
-
-	// --------------------------------------------------------------------------------------------
-	//  Methods to create the test program and for pre- and post- test work
-	// --------------------------------------------------------------------------------------------
-
-	protected abstract void testProgram() throws Exception;
-
-	protected void preSubmit() throws Exception {}
-	
-	protected void postSubmit() throws Exception {}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Test entry point
-	// --------------------------------------------------------------------------------------------
-
-	@Test
-	public void testJob() throws Exception {
-		try {
-			// pre-submit
-			try {
-				preSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Pre-submit work caused an error: " + e.getMessage());
-			}
-
-			// prepare the test environment
-			startCluster();
-
-			TestStreamEnvironment.setAsContext(this.executor, getParallelism());
-
-			// call the test program
-			try {
-				testProgram();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Error while calling the test program: " + e.getMessage());
-			}
-			finally {
-				TestStreamEnvironment.unsetAsContext();
-			}
-
-			// post-submit
-			try {
-				postSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Post-submit work caused an error: " + e.getMessage());
-			}
-		}
-		finally {
-			stopCluster();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
deleted file mode 100644
index 423d08e..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TreeSet;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-public class TestListResultSink<T> extends RichSinkFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-	private int resultListId;
-
-	public TestListResultSink() {
-		this.resultListId = TestListWrapper.getInstance().createList();
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-	}
-
-	@Override
-	public void invoke(T value) throws Exception {
-		synchronized (resultList()) {
-			resultList().add(value);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-	}
-
-	@SuppressWarnings("unchecked")
-	private List<T> resultList() {
-		synchronized (TestListWrapper.getInstance()) {
-			return (List<T>) TestListWrapper.getInstance().getList(resultListId);
-		}
-	}
-
-	public List<T> getResult() {
-		synchronized (resultList()) {
-			ArrayList<T> copiedList = new ArrayList<T>(resultList());
-			return copiedList;
-		}
-	}
-
-	public List<T> getSortedResult() {
-		synchronized (resultList()) {
-			TreeSet<T> treeSet = new TreeSet<T>(resultList());
-			ArrayList<T> sortedList = new ArrayList<T>(treeSet);
-			return sortedList;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
deleted file mode 100644
index 751f836..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class TestListWrapper {
-
-	private static TestListWrapper instance;
-
-	@SuppressWarnings("rawtypes")
-	private List<List<? extends Comparable>> lists;
-
-	@SuppressWarnings("rawtypes")
-	private TestListWrapper() {
-		lists = Collections.synchronizedList(new ArrayList<List<? extends Comparable>>());
-	}
-
-	public static TestListWrapper getInstance() {
-		if (instance == null) {
-			instance = new TestListWrapper();
-		}
-		return instance;
-	}
-
-	/**
-	 * Creates and stores a list, returns with the id.
-	 *
-	 * @return The ID of the list.
-	 */
-	@SuppressWarnings("rawtypes")
-	public int createList() {
-		lists.add(new ArrayList<Comparable>());
-		return lists.size() - 1;
-	}
-
-	public List<?> getList(int listId) {
-		@SuppressWarnings("rawtypes")
-		List<? extends Comparable> list = lists.get(listId);
-		if (list == null) {
-			throw new RuntimeException("No such list.");
-		}
-
-		return list;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
deleted file mode 100644
index c700102..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A StreamExecutionEnvironment that executes its jobs on a test cluster.
- */
-public class TestStreamEnvironment extends StreamExecutionEnvironment {
-	
-	/** The mini cluster in which this environment executes its jobs */
-	private ForkableFlinkMiniCluster executor;
-	
-
-	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
-		this.executor = Preconditions.checkNotNull(executor);
-		setParallelism(parallelism);
-	}
-	
-	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		final StreamGraph streamGraph = getStreamGraph();
-		streamGraph.setJobName(jobName);
-		final JobGraph jobGraph = streamGraph.getJobGraph();
-		return executor.submitJobAndWait(jobGraph, false);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
-	 * the given cluster with the given default parallelism.
-	 * 
-	 * @param cluster The test cluster to run the test program on.
-	 * @param parallelism The default parallelism for the test programs.
-	 */
-	public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
-		
-		StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
-			@Override
-			public StreamExecutionEnvironment createExecutionEnvironment() {
-				return new TestStreamEnvironment(cluster, parallelism);
-			}
-		};
-
-		initializeContextEnvironment(factory);
-	}
-
-	/**
-	 * Resets the streaming context environment to null.
-	 */
-	public static void unsetAsContext() {
-		resetContextEnvironment();
-	} 
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index ffbcb87..b82faf1 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -98,15 +98,6 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
-		<!-- To access streaming test utils -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
deleted file mode 100644
index 7b3ed67..0000000
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.SocketOutputTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.junit.Ignore;
-
-@Ignore
-//This test sometimes fails most likely due to the behaviour
-//of the socket. Disabled for now.
-public class SocketOutputFormatITCase extends SocketOutputTestBase {
-
-		@Override
-		protected void testProgram() throws Exception {
-			OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port);
-		}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 3c35cf1..238c2da 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -58,6 +58,13 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>compile</scope>
+		</dependency>
+
+		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>
 			<version>${junit.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
new file mode 100644
index 0000000..c5fbaf0
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Base class for streaming unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the StreamExecutionEnvironment from
+ * the context:
+ *
+ * <pre>
+ *   {@literal @}Test
+ *   public void someTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * </pre>
+ */
+public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
+
+	// ------------------------------------------------------------------------
+	//  The mini cluster that is shared across tests
+	// ------------------------------------------------------------------------
+
+	protected static final int DEFAULT_PARALLELISM = 4;
+
+	protected static ForkableFlinkMiniCluster cluster;
+
+	public StreamingMultipleProgramsTestBase() {
+		super(new Configuration());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Cluster setup & teardown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true);
+		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		TestStreamEnvironment.unsetAsContext();
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
new file mode 100644
index 0000000..50ed1cf
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public abstract class StreamingProgramTestBase extends AbstractTestBase {
+
+	protected static final int DEFAULT_PARALLELISM = 4;
+
+	private int parallelism;
+	
+	
+	public StreamingProgramTestBase() {
+		super(new Configuration());
+		setParallelism(DEFAULT_PARALLELISM);
+	}
+
+
+	public void setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		setTaskManagerNumSlots(parallelism);
+	}
+	
+	public int getParallelism() {
+		return parallelism;
+	}
+	
+
+	// --------------------------------------------------------------------------------------------
+	//  Methods to create the test program and for pre- and post- test work
+	// --------------------------------------------------------------------------------------------
+
+	protected abstract void testProgram() throws Exception;
+
+	protected void preSubmit() throws Exception {}
+	
+	protected void postSubmit() throws Exception {}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Test entry point
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testJob() throws Exception {
+		try {
+			// pre-submit
+			try {
+				preSubmit();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				fail("Pre-submit work caused an error: " + e.getMessage());
+			}
+
+			// prepare the test environment
+			startCluster();
+
+			TestStreamEnvironment.setAsContext(this.executor, getParallelism());
+
+			// call the test program
+			try {
+				testProgram();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				fail("Error while calling the test program: " + e.getMessage());
+			}
+			finally {
+				TestStreamEnvironment.unsetAsContext();
+			}
+
+			// post-submit
+			try {
+				postSubmit();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				fail("Post-submit work caused an error: " + e.getMessage());
+			}
+		}
+		finally {
+			stopCluster();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
new file mode 100644
index 0000000..c700102
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A StreamExecutionEnvironment that executes its jobs on a test cluster.
+ */
+public class TestStreamEnvironment extends StreamExecutionEnvironment {
+	
+	/** The mini cluster in which this environment executes its jobs */
+	private ForkableFlinkMiniCluster executor;
+	
+
+	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+		this.executor = Preconditions.checkNotNull(executor);
+		setParallelism(parallelism);
+	}
+	
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		final StreamGraph streamGraph = getStreamGraph();
+		streamGraph.setJobName(jobName);
+		final JobGraph jobGraph = streamGraph.getJobGraph();
+		return executor.submitJobAndWait(jobGraph, false);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
+	 * the given cluster with the given default parallelism.
+	 * 
+	 * @param cluster The test cluster to run the test program on.
+	 * @param parallelism The default parallelism for the test programs.
+	 */
+	public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
+		
+		StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
+			@Override
+			public StreamExecutionEnvironment createExecutionEnvironment() {
+				return new TestStreamEnvironment(cluster, parallelism);
+			}
+		};
+
+		initializeContextEnvironment(factory);
+	}
+
+	/**
+	 * Resets the streaming context environment to null.
+	 */
+	public static void unsetAsContext() {
+		resetContextEnvironment();
+	} 
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 9fd8c3e..77216e0 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -542,7 +542,7 @@ under the License.
 										</goals>
 									</pluginExecutionFilter>
 									<action>
-										<ignore></ignore>
+										<ignore/>
 									</action>
 								</pluginExecution>
 							</pluginExecutions>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
new file mode 100644
index 0000000..5d99de4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
+
+	private String resultPath1;
+	private String resultPath2;
+	private String expected1;
+	private String expected2;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception {
+		resultPath1 = tempFolder.newFile().toURI().toString();
+		resultPath2 = tempFolder.newFile().toURI().toString();
+		expected1 = "";
+		expected2 = "";
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected1, resultPath1);
+		compareResultsByLinesInMemory(expected2, resultPath2);
+	}
+
+	/**
+	 * Tests the proper functioning of the streaming fold operator. For this purpose, a stream
+	 * of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple
+	 * value. Each group is folded where the second tuple value is summed up.
+	 *
+	 * This test relies on the hash function used by the {@link DataStream#keyBy}, which is
+	 * assumed to be {@link MathUtils#murmurHash}.
+	 */
+	@Test
+	public void testGroupedFoldOperation() throws Exception {
+		int numElements = 10;
+		final int numKeys = 2;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));
+
+		SplitStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
+			.keyBy(0)
+			.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
+				@Override
+				public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception {
+					return accumulator + value.f1;
+				}
+			}).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() {
+				int key = -1;
+				@Override
+				public Tuple2<Integer, Integer> map(Integer value) throws Exception {
+					if (key == -1){
+						key = MathUtils.murmurHash(value) % numKeys;
+					}
+					return new Tuple2<>(key, value);
+				}
+			}).split(new OutputSelector<Tuple2<Integer, Integer>>() {
+				@Override
+				public Iterable<String> select(Tuple2<Integer, Integer> value) {
+					List<String> output = new ArrayList<>();
+
+					output.add(value.f0 + "");
+					return output;
+				}
+			});
+
+		splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() {
+			@Override
+			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
+				return value.f1;
+			}
+		}).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
+			@Override
+			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
+				return value.f1;
+			}
+		}).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+		StringBuilder builder1 = new StringBuilder();
+		StringBuilder builder2 = new StringBuilder();
+		int counter1 = 0;
+		int counter2 = 0;
+
+		for (int i = 0; i < numElements; i++) {
+			if (MathUtils.murmurHash(i) % numKeys == 0) {
+				counter1 += i;
+				builder1.append(counter1 + "\n");
+			} else {
+				counter2 += i;
+				builder2.append(counter2 + "\n");
+			}
+		}
+
+		expected1 = builder1.toString();
+		expected2 = builder2.toString();
+
+		env.execute();
+	}
+
+	/**
+	 * Tests whether the fold operation can also be called with non Java serializable types.
+	 */
+	@Test
+	public void testFoldOperationWithNonJavaSerializableType() throws Exception {
+		final int numElements = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
+
+		input
+			.keyBy(0)
+			.fold(
+				new NonSerializable(42),
+				new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() {
+					@Override
+					public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) throws Exception {
+						return new NonSerializable(accumulator.value + value.f1.value);
+					}
+			})
+			.map(new MapFunction<NonSerializable, Integer>() {
+				@Override
+				public Integer map(NonSerializable value) throws Exception {
+					return value.value;
+				}
+			})
+			.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		StringBuilder builder = new StringBuilder();
+
+		for (int i = 0; i < numElements; i++) {
+			builder.append(42 + i + "\n");
+		}
+
+		expected1 = builder.toString();
+
+		env.execute();
+	}
+
+	private static class NonSerializable {
+		// This makes the type non-serializable
+		private final Object obj = new Object();
+
+		private final int value;
+
+		public NonSerializable(int value) {
+			this.value = value;
+		}
+	}
+
+	private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> {
+		private final int numElements;
+
+		public NonSerializableTupleSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
+			for (int i = 0; i < numElements; i++) {
+				ctx.collect(new Tuple2<>(i, new NonSerializable(i)));
+			}
+		}
+
+		@Override
+		public void cancel() {}
+	}
+
+	private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {
+
+		private final int numElements;
+		private final int numKeys;
+
+		public TupleSource(int numElements, int numKeys) {
+			this.numElements = numElements;
+			this.numKeys = numKeys;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+			for (int i = 0; i < numElements; i++) {
+				// keys '1' and '2' hash to different buckets
+				Tuple2<Integer, Integer> result = new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i);
+				ctx.collect(result);
+			}
+		}
+
+		@Override
+		public void cancel() {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
new file mode 100644
index 0000000..c2155ac
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.outputformat;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+		DataStream<Tuple2<String, Integer>> counts = text
+				.flatMap(new Tokenizer())
+				.keyBy(0).sum(1);
+
+		counts.writeAsCsv(resultPath);
+
+		env.execute("WriteAsCsvTest");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		//Strip the parentheses from the expected text like output
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
+				.replaceAll("[\\\\(\\\\)]", ""), resultPath);
+	}
+
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
new file mode 100644
index 0000000..2940e6d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.outputformat;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class TextOutputFormatITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+		DataStream<Tuple2<String, Integer>> counts = text
+				.flatMap(new CsvOutputFormatITCase.Tokenizer())
+				.keyBy(0).sum(1);
+
+		counts.writeAsText(resultPath);
+
+		env.execute("WriteAsTextTest");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java
new file mode 100644
index 0000000..d21985b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+@SuppressWarnings("serial")
+public class ChainedRuntimeContextITCase extends StreamingMultipleProgramsTestBase {
+	private static RuntimeContext srcContext;
+	private static RuntimeContext mapContext;
+
+	@Test
+	public void test() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		env.addSource(new TestSource()).map(new TestMap()).addSink(new DiscardingSink<Integer>());
+		env.execute();
+
+		assertNotEquals(srcContext, mapContext);
+
+	}
+
+	private static class TestSource extends RichParallelSourceFunction<Integer> {
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+		}
+
+		@Override
+		public void cancel() {
+		}
+
+		@Override
+		public void open(Configuration c) {
+			srcContext = getRuntimeContext();
+		}
+
+	}
+
+	private static class TestMap extends RichMapFunction<Integer, Integer> {
+
+		@Override
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public void open(Configuration c) {
+			mapContext = getRuntimeContext();
+		}
+
+	}
+
+}


Mime
View raw message