flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [21/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package
Date Wed, 21 Oct 2015 09:03:37 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
deleted file mode 100644
index b94e530..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@RunWith(Parameterized.class)
-public class WindowOperatorTest {
-
-	@SuppressWarnings("unchecked,rawtypes")
-	private WindowBufferFactory windowBufferFactory;
-
-	public WindowOperatorTest(WindowBufferFactory<?, ?> windowBufferFactory) {
-		this.windowBufferFactory = windowBufferFactory;
-	}
-
-	// For counting if close() is called the correct number of times on the SumReducer
-	private static AtomicInteger closeCalled = new AtomicInteger(0);
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testSlidingEventTimeWindows() throws Exception {
-		closeCalled.set(0);
-
-		final int WINDOW_SIZE = 3;
-		final int WINDOW_SLIDE = 1;
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				windowBufferFactory,
-				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
-				EventTimeTrigger.create());
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 999));
-		expectedOutput.add(new Watermark(999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 1999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 1999));
-		expectedOutput.add(new Watermark(1999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
-		expectedOutput.add(new Watermark(2999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 3999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
-		expectedOutput.add(new Watermark(3999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 4999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 4999));
-		expectedOutput.add(new Watermark(4999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
-		expectedOutput.add(new Watermark(5999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 6999));
-		testHarness.processWatermark(new Watermark(initialTime + 7999));
-		expectedOutput.add(new Watermark(6999));
-		expectedOutput.add(new Watermark(7999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
-			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
-		} else {
-			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testTumblingEventTimeWindows() throws Exception {
-		closeCalled.set(0);
-
-		final int WINDOW_SIZE = 3;
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				windowBufferFactory,
-				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
-				EventTimeTrigger.create());
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 999));
-		expectedOutput.add(new Watermark(999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
-		expectedOutput.add(new Watermark(1999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
-		expectedOutput.add(new Watermark(2999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 3999));
-		expectedOutput.add(new Watermark(3999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 4999));
-		expectedOutput.add(new Watermark(4999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
-		expectedOutput.add(new Watermark(5999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 6999));
-		testHarness.processWatermark(new Watermark(initialTime + 7999));
-		expectedOutput.add(new Watermark(6999));
-		expectedOutput.add(new Watermark(7999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
-			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
-		} else {
-			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testContinuousWatermarkTrigger() throws Exception {
-		closeCalled.set(0);
-
-		final int WINDOW_SIZE = 3;
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
-				GlobalWindows.create(),
-				new GlobalWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				windowBufferFactory,
-				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
-				ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// The global window actually ignores these timestamps...
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 1000));
-		expectedOutput.add(new Watermark(1000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 2000));
-		expectedOutput.add(new Watermark(2000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 3000));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
-		expectedOutput.add(new Watermark(3000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 4000));
-		expectedOutput.add(new Watermark(4000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 5000));
-		expectedOutput.add(new Watermark(5000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 6000));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
-		expectedOutput.add(new Watermark(6000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 8000));
-		expectedOutput.add(new Watermark(7000));
-		expectedOutput.add(new Watermark(8000));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
-			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
-		} else {
-			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testCountTrigger() throws Exception {
-		closeCalled.set(0);
-
-		final int WINDOW_SIZE = 4;
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
-				GlobalWindows.create(),
-				new GlobalWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				windowBufferFactory,
-				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
-				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
-				"Tuple2<String, Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// The global window actually ignores these timestamps...
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
-			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
-		} else {
-			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-		}
-
-	}
-
-	// ------------------------------------------------------------------------
-	//  UDFs
-	// ------------------------------------------------------------------------
-
-	public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private boolean openCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			closeCalled.incrementAndGet();
-		}
-
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called");
-			}
-			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
-		}
-	}
-	// ------------------------------------------------------------------------
-	//  Parametrization for testing different window buffers
-	// ------------------------------------------------------------------------
-
-	@Parameterized.Parameters(name = "WindowBuffer = {0}")
-	@SuppressWarnings("unchecked,rawtypes")
-	public static Collection<WindowBufferFactory[]> windowBuffers(){
-		return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new SumReducer())},
-				new WindowBufferFactory[]{new HeapWindowBuffer.Factory()}
-				);
-	}
-
-	@SuppressWarnings("unchecked")
-	private static class ResultSortComparator implements Comparator<Object> {
-		@Override
-		public int compare(Object o1, Object o2) {
-			if (o1 instanceof Watermark || o2 instanceof Watermark) {
-				return 0;
-			} else {
-				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
-				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
-				if (sr0.getTimestamp() != sr1.getTimestamp()) {
-					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
-				}
-				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
-				if (comparison != 0) {
-					return comparison;
-				} else {
-					return sr0.getValue().f1 - sr1.getValue().f1;
-				}
-			}
-		}
-	}
-
-	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
deleted file mode 100644
index 13766a1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.WindowedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
- */
-public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
-	/**
-	 * These tests ensure that the fast aligned time windows operator is used if the
-	 * conditions are right.
-	 */
-	@Test
-	public void testFastTimeWindows() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(0)
-				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.keyBy(0)
-				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(Tuple tuple,
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
-	}
-
-	/**
-	 * These tests ensure that the correct trigger is set when using event-time windows.
-	 */
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testEventTime() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(0)
-				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof WindowOperator);
-		WindowOperator winOperator1 = (WindowOperator) operator1;
-		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(Tuple tuple,
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof WindowOperator);
-		WindowOperator winOperator2 = (WindowOperator) operator2;
-		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-	}
-
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testNonEvicting() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(0)
-				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.trigger(CountTrigger.of(100))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof WindowOperator);
-		WindowOperator winOperator1 = (WindowOperator) operator1;
-		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.trigger(CountTrigger.of(100))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(Tuple tuple,
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof WindowOperator);
-		WindowOperator winOperator2 = (WindowOperator) operator2;
-		Assert.assertTrue(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-	}
-
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testEvicting() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(0)
-				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.evictor(CountEvictor.of(100))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
-		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
-		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
-		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.trigger(CountTrigger.of(100))
-				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(Tuple tuple,
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
-		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
-		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
-		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-	}
-
-	// ------------------------------------------------------------------------
-	//  UDFs
-	// ------------------------------------------------------------------------
-
-	public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
-			return value1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
deleted file mode 100644
index a1cea13..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BroadcastPartitionerTest {
-
-	private BroadcastPartitioner<Tuple> broadcastPartitioner1;
-	private BroadcastPartitioner<Tuple> broadcastPartitioner2;
-	private BroadcastPartitioner<Tuple> broadcastPartitioner3;
-	
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
-
-	@Before
-	public void setPartitioner() {
-		broadcastPartitioner1 = new BroadcastPartitioner<Tuple>();
-		broadcastPartitioner2 = new BroadcastPartitioner<Tuple>();
-		broadcastPartitioner3 = new BroadcastPartitioner<Tuple>();
-
-	}
-
-	@Test
-	public void testSelectChannels() {
-		int[] first = new int[] { 0 };
-		int[] second = new int[] { 0, 1 };
-		int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
-		sd.setInstance(streamRecord);
-		assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1));
-		assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2));
-		assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
deleted file mode 100644
index f7bd739..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ForwardPartitionerTest {
-
-	private ForwardPartitioner<Tuple> forwardPartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-
-	@Before
-	public void setPartitioner() {
-		forwardPartitioner = new ForwardPartitioner<Tuple>();
-	}
-
-	@Test
-	public void testSelectChannelsLength() {
-		sd.setInstance(streamRecord);
-		assertEquals(1, forwardPartitioner.selectChannels(sd, 1).length);
-		assertEquals(1, forwardPartitioner.selectChannels(sd, 2).length);
-		assertEquals(1, forwardPartitioner.selectChannels(sd, 1024).length);
-	}
-
-	@Test
-	public void testSelectChannelsInterval() {
-		sd.setInstance(streamRecord);
-		assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]);
-		assertEquals(0, forwardPartitioner.selectChannels(sd, 2)[0]);
-		assertEquals(0, forwardPartitioner.selectChannels(sd, 1024)[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
deleted file mode 100644
index 6ae3730..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GlobalPartitionerTest {
-
-	private GlobalPartitioner<Tuple> globalPartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-
-	@Before
-	public void setPartitioner() {
-		globalPartitioner = new GlobalPartitioner<Tuple>();
-	}
-
-	@Test
-	public void testSelectChannels() {
-		int[] result = new int[] { 0 };
-
-		sd.setInstance(streamRecord);
-
-		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1));
-		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2));
-		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/HashPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/HashPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/HashPartitionerTest.java
deleted file mode 100644
index 6dbf932..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/HashPartitionerTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class HashPartitionerTest {
-
-	private HashPartitioner<Tuple2<String, Integer>> hashPartitioner;
-	private StreamRecord<Tuple2<String, Integer>> streamRecord1 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 0));
-	private StreamRecord<Tuple2<String, Integer>> streamRecord2 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 42));
-	private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd1 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
-	private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd2 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
-
-	@Before
-	public void setPartitioner() {
-		hashPartitioner = new HashPartitioner<Tuple2<String, Integer>>(new KeySelector<Tuple2<String, Integer>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Tuple2<String, Integer> value) throws Exception {
-				return value.getField(0);
-			}
-		});
-	}
-
-	@Test
-	public void testSelectChannelsLength() {
-		sd1.setInstance(streamRecord1);
-		assertEquals(1, hashPartitioner.selectChannels(sd1, 1).length);
-		assertEquals(1, hashPartitioner.selectChannels(sd1, 2).length);
-		assertEquals(1, hashPartitioner.selectChannels(sd1, 1024).length);
-	}
-
-	@Test
-	public void testSelectChannelsGrouping() {
-		sd1.setInstance(streamRecord1);
-		sd2.setInstance(streamRecord2);
-
-		assertArrayEquals(hashPartitioner.selectChannels(sd1, 1),
-				hashPartitioner.selectChannels(sd2, 1));
-		assertArrayEquals(hashPartitioner.selectChannels(sd1, 2),
-				hashPartitioner.selectChannels(sd2, 2));
-		assertArrayEquals(hashPartitioner.selectChannels(sd1, 1024),
-				hashPartitioner.selectChannels(sd2, 1024));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
deleted file mode 100644
index aa70e8a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RebalancePartitionerTest {
-	
-	private RebalancePartitioner<Tuple> distributePartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-	
-	@Before
-	public void setPartitioner() {
-		distributePartitioner = new RebalancePartitioner<Tuple>();
-	}
-	
-	@Test
-	public void testSelectChannelsLength() {
-		sd.setInstance(streamRecord);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
-	}
-	
-	@Test
-	public void testSelectChannelsInterval() {
-		sd.setInstance(streamRecord);
-		assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
deleted file mode 100644
index aff177c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ShufflePartitionerTest {
-
-	private ShufflePartitioner<Tuple> shufflePartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-
-	@Before
-	public void setPartitioner() {
-		shufflePartitioner = new ShufflePartitioner<Tuple>();
-	}
-
-	@Test
-	public void testSelectChannelsLength() {
-		sd.setInstance(streamRecord);
-		assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length);
-		assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length);
-		assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length);
-	}
-
-	@Test
-	public void testSelectChannelsInterval() {
-		sd.setInstance(streamRecord);
-		assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
-
-		assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
-		assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
-
-		assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
-		assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
deleted file mode 100644
index d48f7f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class StreamRecordSerializerTest {
-	
-	@Test
-	public void testDeepDuplication() {
-		try {
-			@SuppressWarnings("unchecked")
-			TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class);
-			@SuppressWarnings("unchecked")
-			TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class);
-			
-			when(serializer1.duplicate()).thenReturn(serializer2);
-			
-			StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(serializer1);
-			assertEquals(serializer1, streamRecSer.getContainedTypeSerializer());
-			
-			StreamRecordSerializer<Long> copy = streamRecSer.duplicate();
-			assertNotEquals(copy, streamRecSer);
-			assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testBasicProperties() {
-		try {
-			StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(LongSerializer.INSTANCE);
-			
-			assertFalse(streamRecSer.isImmutableType());
-			assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass());
-			assertEquals(LongSerializer.INSTANCE.getLength(), streamRecSer.getLength());
-			
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
deleted file mode 100644
index 4c6957b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Tests for {@link OneInputStreamTask}.
- *
- * <p>
- * Note:<br>
- * We only use a {@link StreamMap} operator here. We also test the individual operators but Map is
- * used as a representative to test OneInputStreamTask, since OneInputStreamTask is used for all
- * OneInputStreamOperators.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-public class OneInputStreamTaskTest {
-
-	/**
-	 * This test verifies that open() and close() are correctly called. This test also verifies
-	 * that timestamps of emitted elements are correct. {@link StreamMap} assigns the input
-	 * timestamp to emitted elements.
-	 */
-	@Test
-	public void testOpenCloseAndTimestamps() throws Exception {
-		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
-		streamConfig.setStreamOperator(mapOperator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.invoke();
-
-		testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1));
-		testHarness.processElement(new StreamRecord<String>("Ciao", initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("Ciao", initialTime + 2));
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
-
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				expectedOutput,
-				testHarness.getOutput());
-	}
-
-	/**
-	 * This test verifies that watermarks are correctly forwarded. This also checks whether
-	 * watermarks are forwarded only when we have received watermarks from all inputs. The
-	 * forwarded watermark must be the minimum of the watermarks of all inputs.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testWatermarkForwarding() throws Exception {
-		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
-		streamConfig.setStreamOperator(mapOperator);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-		long initialTime = 0L;
-
-		testHarness.invoke();
-
-		testHarness.processElement(new Watermark(initialTime), 0, 0);
-		testHarness.processElement(new Watermark(initialTime), 0, 1);
-		testHarness.processElement(new Watermark(initialTime), 1, 0);
-
-		// now the output should still be empty
-		testHarness.waitForInputProcessing();
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.processElement(new Watermark(initialTime), 1, 1);
-
-		// now the watermark should have propagated, Map simply forward Watermarks
-		testHarness.waitForInputProcessing();
-		expectedOutput.add(new Watermark(initialTime));
-		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				expectedOutput,
-				testHarness.getOutput());
-
-		// contrary to checkpoint barriers these elements are not blocked by watermarks
-		testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
-		testHarness.processElement(new StreamRecord<String>("Ciao", initialTime));
-		expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
-		expectedOutput.add(new StreamRecord<String>("Ciao", initialTime));
-
-		testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
-		testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
-		testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
-		testHarness.processElement(new Watermark(initialTime + 2), 1, 1);
-
-		// check whether we get the minimum of all the watermarks, this must also only occur in
-		// the output after the two StreamRecords
-		testHarness.waitForInputProcessing();
-		expectedOutput.add(new Watermark(initialTime + 2));
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-
-		// advance watermark from one of the inputs, now we should get a now one since the
-		// minimum increases
-		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
-		testHarness.waitForInputProcessing();
-		expectedOutput.add(new Watermark(initialTime + 3));
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		// advance the other two inputs, now we should get a new one since the
-		// minimum increases again
-		testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
-		testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
-		testHarness.waitForInputProcessing();
-		expectedOutput.add(new Watermark(initialTime + 4));
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
-
-		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-		Assert.assertEquals(2, resultElements.size());
-	}
-
-	/**
-	 * This test verifies that checkpoint barriers are correctly forwarded.
-	 */
-	@Test
-	public void testCheckpointBarriers() throws Exception {
-		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
-		streamConfig.setStreamOperator(mapOperator);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-		long initialTime = 0L;
-
-		testHarness.invoke();
-
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
-
-		// These elements should be buffered until we receive barriers from
-		// all inputs
-		testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
-		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
-
-		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
-		// on that input, only add to same input, otherwise we would not know the ordering
-		// of the output since the Task might read the inputs in any order
-		testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
-		testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
-		expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
-		expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
-
-		testHarness.waitForInputProcessing();
-		// we should not yet see the barrier, only the two elements from non-blocked input
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
-
-		testHarness.waitForInputProcessing();
-
-		// now we should see the barrier and after that the buffered elements
-		expectedOutput.add(new CheckpointBarrier(0, 0));
-		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
-		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	/**
-	 * This test verifies that checkpoint barriers and barrier buffers work correctly with
-	 * concurrent checkpoint barriers where one checkpoint is "overtaking" another checkpoint, i.e.
-	 * some inputs receive barriers from an earlier checkpoint, thereby blocking,
-	 * then all inputs receive barriers from a later checkpoint.
-	 */
-	@Test
-	public void testOvertakingCheckpointBarriers() throws Exception {
-		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
-		streamConfig.setStreamOperator(mapOperator);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-		long initialTime = 0L;
-
-		testHarness.invoke();
-
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
-
-		// These elements should be buffered until we receive barriers from
-		// all inputs
-		testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
-		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
-
-		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
-		// on that input, only add to same input, otherwise we would not know the ordering
-		// of the output since the Task might read the inputs in any order
-		testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
-		testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
-		expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
-		expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
-
-		testHarness.waitForInputProcessing();
-		// we should not yet see the barrier, only the two elements from non-blocked input
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		// Now give a later barrier to all inputs, this should unblock the first channel,
-		// thereby allowing the two blocked elements through
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
-
-		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
-		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-		expectedOutput.add(new CheckpointBarrier(1, 1));
-
-		testHarness.waitForInputProcessing();
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-
-		// Then give the earlier barrier, these should be ignored
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
-
-		testHarness.waitForInputProcessing();
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public String map(String value) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			return value;
-		}
-	}
-
-	private static class IdentityMap implements MapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map(String value) throws Exception {
-			return value;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
deleted file mode 100644
index 7fb8ba3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-
-import java.io.IOException;
-
-
-/**
- * Test harness for testing a {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
- *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and allows pushing elements
- * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
- * and events. You are free to modify the retrieved list.
- *
- * <p>
- * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
- * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
- * thread to finish. Use {@link #processElement} to send elements to the task. Use
- * {@link #processEvent(AbstractEvent)} to send events to the task.
- * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
- * that data entry is finished.
- *
- * <p>
- * When Elements or Events are offered to the Task they are put into a queue. The input gates
- * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
- * queues are empty. This must be used after entering some elements before checking the
- * desired output.
- *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
- * {@code @PrepareForTest({ResultPartitionWriter.class})}
- */
-public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarness<OUT> {
-
-	private TypeInformation<IN> inputType;
-	private TypeSerializer<IN> inputSerializer;
-
-	/**
-	 * Creates a test harness with the specified number of input gates and specified number
-	 * of channels per input gate.
-	 */
-	public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
-			int numInputGates,
-			int numInputChannelsPerGate,
-			TypeInformation<IN> inputType,
-			TypeInformation<OUT> outputType) {
-		super(task, outputType);
-
-		this.inputType = inputType;
-		inputSerializer = inputType.createSerializer(executionConfig);
-
-		this.numInputGates = numInputGates;
-		this.numInputChannelsPerGate = numInputChannelsPerGate;
-	}
-
-	/**
-	 * Creates a test harness with one input gate that has one input channel.
-	 */
-	public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
-			TypeInformation<IN> inputType,
-			TypeInformation<OUT> outputType) {
-		this(task, 1, 1, inputType, outputType);
-	}
-
-	@Override
-	protected void initializeInputs() throws IOException, InterruptedException {
-		inputGates = new StreamTestSingleInputGate[numInputGates];
-
-		for (int i = 0; i < numInputGates; i++) {
-			inputGates[i] = new StreamTestSingleInputGate<IN>(
-					numInputChannelsPerGate,
-					bufferSize,
-					inputSerializer);
-			this.mockEnv.addInputGate(inputGates[i].getInputGate());
-		}
-
-
-		streamConfig.setNumberOfInputs(1);
-		streamConfig.setTypeSerializerIn1(inputSerializer);
-	}
-
-}
-


Mime
View raw message