flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [22/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package
Date Wed, 21 Oct 2015 09:03:38 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
deleted file mode 100644
index 3c1c24b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CollectingOutput<T> implements Output<StreamRecord<T>> {
-	
-	private final List<T> elements = new ArrayList<>();
-
-	private final int timeStampModulus;
-
-
-	public CollectingOutput() {
-		this.timeStampModulus = 0;
-	}
-	
-	public CollectingOutput(int timeStampModulus) {
-		this.timeStampModulus = timeStampModulus;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public List<T> getElements() {
-		return elements;
-	}
-	
-	public void waitForNElements(int n, long timeout) throws InterruptedException {
-		long deadline = System.currentTimeMillis() + timeout;
-		synchronized (elements) {
-			long now;
-			while (elements.size() < n && (now = System.currentTimeMillis()) < deadline) {
-				elements.wait(deadline - now);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void emitWatermark(Watermark mark) {
-		throw new UnsupportedOperationException("the output should not emit watermarks");
-	}
-
-	@Override
-	public void collect(StreamRecord<T> record) {
-		elements.add(record.getValue());
-		
-		if (timeStampModulus != 0 && record.getTimestamp() % timeStampModulus != 0) {
-			throw new IllegalArgumentException("Invalid timestamp");
-		}
-		synchronized (elements) {
-			elements.notifyAll();
-		}
-	}
-
-	@Override
-	public void close() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
deleted file mode 100644
index 39033cc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class EvictingNonKeyedWindowOperatorTest {
-
-	// For counting if close() is called the correct number of times on the SumReducer
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testCountTrigger() throws Exception {
-		AtomicInteger closeCalled = new AtomicInteger(0);
-
-		final int WINDOW_SIZE = 4;
-		final int WINDOW_SLIDE = 2;
-
-		EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
-				GlobalWindows.create(),
-				new GlobalWindow.Serializer(),
-				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
-				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
-				CountTrigger.of(WINDOW_SLIDE),
-				CountEvictor.of(WINDOW_SIZE));
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// The global window actually ignores these timestamps...
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-
-		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-
-
-	}
-
-	// ------------------------------------------------------------------------
-	//  UDFs
-	// ------------------------------------------------------------------------
-
-	public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private boolean openCalled = false;
-
-		private  AtomicInteger closeCalled;
-
-		public SumReducer(AtomicInteger closeCalled) {
-			this.closeCalled = closeCalled;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			closeCalled.incrementAndGet();
-		}
-
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called");
-			}
-			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private static class ResultSortComparator implements Comparator<Object> {
-		@Override
-		public int compare(Object o1, Object o2) {
-			if (o1 instanceof Watermark || o2 instanceof Watermark) {
-				return 0;
-			} else {
-				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
-				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
-				if (sr0.getTimestamp() != sr1.getTimestamp()) {
-					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
-				}
-				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
-				if (comparison != 0) {
-					return comparison;
-				} else {
-					return sr0.getValue().f1 - sr1.getValue().f1;
-				}
-			}
-		}
-	}
-
-	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
deleted file mode 100644
index 1821308..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class EvictingWindowOperatorTest {
-
-	// For counting if close() is called the correct number of times on the SumReducer
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testCountTrigger() throws Exception {
-		AtomicInteger closeCalled = new AtomicInteger(0);
-
-		final int WINDOW_SIZE = 4;
-		final int WINDOW_SLIDE = 2;
-
-		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
-				GlobalWindows.create(),
-				new GlobalWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
-				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
-				CountTrigger.of(WINDOW_SLIDE),
-				CountEvictor.of(WINDOW_SIZE));
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// The global window actually ignores these timestamps...
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-
-		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-
-
-	}
-
-	// ------------------------------------------------------------------------
-	//  UDFs
-	// ------------------------------------------------------------------------
-
-	public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private boolean openCalled = false;
-
-		private  AtomicInteger closeCalled;
-
-		public SumReducer(AtomicInteger closeCalled) {
-			this.closeCalled = closeCalled;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			closeCalled.incrementAndGet();
-		}
-
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called");
-			}
-			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private static class ResultSortComparator implements Comparator<Object> {
-		@Override
-		public int compare(Object o1, Object o2) {
-			if (o1 instanceof Watermark || o2 instanceof Watermark) {
-				return 0;
-			} else {
-				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
-				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
-				if (sr0.getTimestamp() != sr1.getTimestamp()) {
-					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
-				}
-				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
-				if (comparison != 0) {
-					return comparison;
-				} else {
-					return sr0.getValue().f1 - sr1.getValue().f1;
-				}
-			}
-		}
-	}
-
-	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
deleted file mode 100644
index c0b20a3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class KeyMapPutIfAbsentTest {
-	
-	@Test
-	public void testPutIfAbsentUniqueKeysAndGrowth() {
-		try {
-			KeyMap<Integer, Integer> map = new KeyMap<>();
-			IntegerFactory factory = new IntegerFactory();
-			
-			final int numElements = 1000000;
-			
-			for (int i = 0; i < numElements; i++) {
-				factory.set(2 * i + 1);
-				map.putIfAbsent(i, factory);
-
-				assertEquals(i+1, map.size());
-				assertTrue(map.getCurrentTableCapacity() > map.size());
-				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
-				assertTrue(map.size() <= map.getRehashThreshold());
-			}
-			
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-			
-			for (int i = numElements - 1; i >= 0; i--) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-			assertTrue(map.getLongestChainLength() <= 7);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testPutIfAbsentDuplicateKeysAndGrowth() {
-		try {
-			KeyMap<Integer, Integer> map = new KeyMap<>();
-			IntegerFactory factory = new IntegerFactory();
-			
-			final int numElements = 1000000;
-
-			for (int i = 0; i < numElements; i++) {
-				int val = 2 * i + 1;
-				factory.set(val);
-				Integer put = map.putIfAbsent(i, factory);
-				assertEquals(val, put.intValue());
-			}
-
-			for (int i = 0; i < numElements; i += 3) {
-				factory.set(2 * i);
-				Integer put = map.putIfAbsent(i, factory);
-				assertEquals(2 * i + 1, put.intValue());
-			}
-
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-			assertTrue(map.getLongestChainLength() <= 7);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private static class IntegerFactory implements KeyMap.LazyFactory<Integer> {
-		
-		private Integer toCreate;
-		
-		public void set(Integer toCreate) {
-			this.toCreate = toCreate;
-		}
-
-		@Override
-		public Integer create() {
-			return toCreate;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
deleted file mode 100644
index 09c44fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.junit.Test;
-
-import java.util.BitSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class KeyMapPutTest {
-
-	@Test
-	public void testPutUniqueKeysAndGrowth() {
-		try {
-			KeyMap<Integer, Integer> map = new KeyMap<>();
-
-			final int numElements = 1000000;
-
-			for (int i = 0; i < numElements; i++) {
-				map.put(i, 2 * i + 1);
-
-				assertEquals(i+1, map.size());
-				assertTrue(map.getCurrentTableCapacity() > map.size());
-				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
-				assertTrue(map.size() <= map.getRehashThreshold());
-			}
-
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-
-			for (int i = numElements - 1; i >= 0; i--) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-
-			BitSet bitset = new BitSet();
-			int numContained = 0;
-			for (KeyMap.Entry<Integer, Integer> entry : map) {
-				numContained++;
-				
-				assertEquals(entry.getKey() * 2 + 1, entry.getValue().intValue());
-				assertFalse(bitset.get(entry.getKey()));
-				bitset.set(entry.getKey());
-			}
-
-			assertEquals(numElements, numContained);
-			assertEquals(numElements, bitset.cardinality());
-			
-			
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-			assertTrue(map.getLongestChainLength() <= 7);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPutDuplicateKeysAndGrowth() {
-		try {
-			final KeyMap<Integer, Integer> map = new KeyMap<>();
-			final int numElements = 1000000;
-
-			for (int i = 0; i < numElements; i++) {
-				Integer put = map.put(i, 2*i+1);
-				assertNull(put);
-			}
-
-			for (int i = 0; i < numElements; i += 3) {
-				Integer put = map.put(i, 2*i);
-				assertNotNull(put);
-				assertEquals(2*i+1, put.intValue());
-			}
-
-			for (int i = 0; i < numElements; i++) {
-				int expected = (i % 3 == 0) ? (2*i) : (2*i+1);
-				assertEquals(expected, map.get(i).intValue());
-			}
-			
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-			assertTrue(map.getLongestChainLength() <= 7);
-
-			
-			BitSet bitset = new BitSet();
-			int numContained = 0;
-			for (KeyMap.Entry<Integer, Integer> entry : map) {
-				numContained++;
-
-				int key = entry.getKey();
-				int expected = key % 3 == 0 ? (2*key) : (2*key+1);
-
-				assertEquals(expected, entry.getValue().intValue());
-				assertFalse(bitset.get(key));
-				bitset.set(key);
-			}
-
-			assertEquals(numElements, numContained);
-			assertEquals(numElements, bitset.cardinality());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
deleted file mode 100644
index 49310df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-public class KeyMapTest {
-	
-	@Test
-	public void testInitialSizeComputation() {
-		try {
-			KeyMap<String, String> map;
-
-			map = new KeyMap<>();
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-			
-			map = new KeyMap<>(0);
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-
-			map = new KeyMap<>(1);
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-
-			map = new KeyMap<>(9);
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-
-			map = new KeyMap<>(63);
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-
-			map = new KeyMap<>(64);
-			assertEquals(128, map.getCurrentTableCapacity());
-			assertEquals(7, map.getLog2TableCapacity());
-			assertEquals(23, map.getShift());
-			assertEquals(96, map.getRehashThreshold());
-
-			map = new KeyMap<>(500);
-			assertEquals(512, map.getCurrentTableCapacity());
-			assertEquals(9, map.getLog2TableCapacity());
-			assertEquals(21, map.getShift());
-			assertEquals(384, map.getRehashThreshold());
-
-			map = new KeyMap<>(127);
-			assertEquals(128, map.getCurrentTableCapacity());
-			assertEquals(7, map.getLog2TableCapacity());
-			assertEquals(23, map.getShift());
-			assertEquals(96, map.getRehashThreshold());
-			
-			// no negative number of elements
-			try {
-				new KeyMap<>(-1);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-			
-			// check integer overflow
-			try {
-				map = new KeyMap<>(0x65715522);
-
-				final int maxCap = Integer.highestOneBit(Integer.MAX_VALUE);
-				assertEquals(Integer.highestOneBit(Integer.MAX_VALUE), map.getCurrentTableCapacity());
-				assertEquals(30, map.getLog2TableCapacity());
-				assertEquals(0, map.getShift());
-				assertEquals(maxCap / 4 * 3, map.getRehashThreshold());
-			}
-			catch (OutOfMemoryError e) {
-				// this may indeed happen in small test setups. we tolerate this in this test
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPutAndGetRandom() {
-		try {
-			final KeyMap<Integer, Integer> map = new KeyMap<>();
-			final Random rnd = new Random();
-			
-			final long seed = rnd.nextLong();
-			final int numElements = 10000;
-			
-			final HashMap<Integer, Integer> groundTruth = new HashMap<>();
-			
-			rnd.setSeed(seed);
-			for (int i = 0; i < numElements; i++) {
-				Integer key = rnd.nextInt();
-				Integer value = rnd.nextInt();
-				
-				if (rnd.nextBoolean()) {
-					groundTruth.put(key, value);
-					map.put(key, value);
-				}
-			}
-
-			rnd.setSeed(seed);
-			for (int i = 0; i < numElements; i++) {
-				Integer key = rnd.nextInt();
-
-				// skip these, evaluating it is tricky due to duplicates
-				rnd.nextInt();
-				rnd.nextBoolean();
-				
-				Integer expected = groundTruth.get(key);
-				if (expected == null) {
-					assertNull(map.get(key));
-				}
-				else {
-					Integer contained = map.get(key);
-					assertNotNull(contained);
-					assertEquals(expected, contained);
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testConjunctTraversal() {
-		try {
-			final Random rootRnd = new Random(654685486325439L);
-			
-			final int numMaps = 7;
-			final int numKeys = 1000000;
-
-			// ------ create a set of maps ------
-			@SuppressWarnings("unchecked")
-			final KeyMap<Integer, Integer>[] maps = (KeyMap<Integer, Integer>[]) new KeyMap<?, ?>[numMaps];
-			for (int i = 0; i < numMaps; i++) {
-				maps[i] = new KeyMap<>();
-			}
-			
-			// ------ prepare probabilities for maps ------
-			final double[] probabilities = new double[numMaps];
-			final double[] probabilitiesTemp = new double[numMaps];
-			{
-				probabilities[0] = 0.5;
-				double remainingProb = 1.0 - probabilities[0];
-				for (int i = 1; i < numMaps - 1; i++) {
-					remainingProb /= 2;
-					probabilities[i] = remainingProb;
-				}
-
-				// compensate for rounding errors
-				probabilities[numMaps - 1] = remainingProb;
-			}
-			
-			// ------ generate random elements ------
-			final long probSeed = rootRnd.nextLong();
-			final long keySeed = rootRnd.nextLong();
-			
-			final Random probRnd = new Random(probSeed);
-			final Random keyRnd = new Random(keySeed);
-			
-			final int maxStride = Integer.MAX_VALUE / numKeys;
-			
-			int totalNumElements = 0;
-			int nextKeyValue = 1;
-			
-			for (int i = 0; i < numKeys; i++) {
-				int numCopies = (nextKeyValue % 3) + 1;
-				System.arraycopy(probabilities, 0, probabilitiesTemp, 0, numMaps);
-				
-				double totalProb = 1.0;
-				for (int copy = 0; copy < numCopies; copy++) {
-					int pos = drawPosProportionally(probabilitiesTemp, totalProb, probRnd);
-					totalProb -= probabilitiesTemp[pos];
-					probabilitiesTemp[pos] = 0.0;
-					
-					Integer boxed = nextKeyValue;
-					Integer previous = maps[pos].put(boxed, boxed);
-					assertNull("Test problem - test does not assign unique maps", previous);
-				}
-				
-				totalNumElements += numCopies;
-				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
-			}
-			
-			
-			// check that all maps contain the total number of elements
-			int numContained = 0;
-			for (KeyMap<?, ?> map : maps) {
-				numContained += map.size();
-			}
-			assertEquals(totalNumElements, numContained);
-
-			// ------ check that all elements can be found in the maps ------
-			keyRnd.setSeed(keySeed);
-			
-			numContained = 0;
-			nextKeyValue = 1;
-			for (int i = 0; i < numKeys; i++) {
-				int numCopiesExpected = (nextKeyValue % 3) + 1;
-				int numCopiesContained = 0;
-				
-				for (KeyMap<Integer, Integer> map : maps) {
-					Integer val = map.get(nextKeyValue);
-					if (val != null) {
-						assertEquals(nextKeyValue, val.intValue());
-						numCopiesContained++;
-					}
-				}
-				
-				assertEquals(numCopiesExpected, numCopiesContained);
-				numContained += numCopiesContained;
-				
-				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
-			}
-			assertEquals(totalNumElements, numContained);
-
-			// ------ make a traversal over all keys and validate the keys in the traversal ------
-			final int[] keysStartedAndFinished = { 0, 0 };
-			KeyMap.TraversalEvaluator<Integer, Integer> traversal = new KeyMap.TraversalEvaluator<Integer, Integer>() {
-
-				private int key;
-				private int valueCount;
-				
-				@Override
-				public void startNewKey(Integer key) {
-					this.key = key;
-					this.valueCount = 0;
-					
-					keysStartedAndFinished[0]++;
-				}
-
-				@Override
-				public void nextValue(Integer value) {
-					assertEquals(this.key, value.intValue());
-					this.valueCount++;
-				}
-
-				@Override
-				public void keyDone() {
-					int expected = (key % 3) + 1;
-					if (expected != valueCount) {
-						fail("Wrong count for key " + key + " ; expected=" + expected + " , count=" + valueCount);
-					}
-					
-					keysStartedAndFinished[1]++;
-				}
-			};
-			
-			KeyMap.traverseMaps(shuffleArray(maps, rootRnd), traversal, 17);
-			
-			assertEquals(numKeys, keysStartedAndFinished[0]);
-			assertEquals(numKeys, keysStartedAndFinished[1]);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSizeComparator() {
-		try {
-			KeyMap<String, String> map1 = new KeyMap<>(5);
-			KeyMap<String, String> map2 = new KeyMap<>(80);
-			
-			assertTrue(map1.getCurrentTableCapacity() < map2.getCurrentTableCapacity());
-			
-			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map1) == 0);
-			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map2) == 0);
-			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map2) > 0);
-			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map1) < 0);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-
-	private static int drawPosProportionally(double[] array, double totalProbability, Random rnd) {
-		double val = rnd.nextDouble() * totalProbability;
-		
-		double accum = 0;
-		for (int i = 0; i < array.length; i++) {
-			accum += array[i];
-			if (val <= accum && array[i] > 0.0) {
-				return i;
-			}
-		}
-		
-		// in case of rounding errors
-		return array.length - 1;
-	}
-	
-	private static <E> E[] shuffleArray(E[] array, Random rnd) {
-		E[] target = Arrays.copyOf(array, array.length);
-		
-		for (int i = target.length - 1; i > 0; i--) {
-			int swapPos = rnd.nextInt(i + 1);
-			E temp = target[i];
-			target[i] = target[swapPos];
-			target[swapPos] = temp;
-		}
-		
-		return target;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
deleted file mode 100644
index 02e032a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@RunWith(Parameterized.class)
-public class NonKeyedWindowOperatorTest {
-
-	@SuppressWarnings("unchecked,rawtypes")
-	private WindowBufferFactory windowBufferFactory;
-
-	public NonKeyedWindowOperatorTest(WindowBufferFactory<?, ?> windowBufferFactory) {
-		this.windowBufferFactory = windowBufferFactory;
-	}
-
-	// For counting if close() is called the correct number of times on the SumReducer
-	private static AtomicInteger closeCalled = new AtomicInteger(0);
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testSlidingEventTimeWindows() throws Exception {
-		closeCalled.set(0);
-
-		final int WINDOW_SIZE = 3;
-		final int WINDOW_SLIDE = 1;
-
-		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
-				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				windowBufferFactory,
-				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
-				EventTimeTrigger.create());
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 999));
-		expectedOutput.add(new Watermark(999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 1999));
-		expectedOutput.add(new Watermark(1999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
-		expectedOutput.add(new Watermark(2999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 3999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
-		expectedOutput.add(new Watermark(3999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 4999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 4999));
-		expectedOutput.add(new Watermark(4999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
-		expectedOutput.add(new Watermark(5999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 6999));
-		testHarness.processWatermark(new Watermark(initialTime + 7999));
-		expectedOutput.add(new Watermark(6999));
-		expectedOutput.add(new Watermark(7999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
-			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
-		} else {
-			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testTumblingEventTimeWindows() throws Exception {
-		closeCalled.set(0);
-
-		final int WINDOW_SIZE = 3;
-
-		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
-				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				windowBufferFactory,
-				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
-				EventTimeTrigger.create());
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 999));
-		expectedOutput.add(new Watermark(999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 1999));
-		expectedOutput.add(new Watermark(1999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
-		expectedOutput.add(new Watermark(2999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 3999));
-		expectedOutput.add(new Watermark(3999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 4999));
-		expectedOutput.add(new Watermark(4999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
-		expectedOutput.add(new Watermark(5999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 6999));
-		testHarness.processWatermark(new Watermark(initialTime + 7999));
-		expectedOutput.add(new Watermark(6999));
-		expectedOutput.add(new Watermark(7999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
-			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
-		} else {
-			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testContinuousWatermarkTrigger() throws Exception {
-		closeCalled.set(0);
-
-		final int WINDOW_SIZE = 3;
-
-		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
-				GlobalWindows.create(),
-				new GlobalWindow.Serializer(),
-				windowBufferFactory,
-				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
-				ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// The global window actually ignores these timestamps...
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 1000));
-		expectedOutput.add(new Watermark(1000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		testHarness.processWatermark(new Watermark(initialTime + 2000));
-		expectedOutput.add(new Watermark(2000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 3000));
-		expectedOutput.add(new Watermark(3000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 4000));
-		expectedOutput.add(new Watermark(4000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 5000));
-		expectedOutput.add(new Watermark(5000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(initialTime + 6000));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
-		expectedOutput.add(new Watermark(6000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		// those don't have any effect...
-		testHarness.processWatermark(new Watermark(initialTime + 7000));
-		testHarness.processWatermark(new Watermark(initialTime + 8000));
-		expectedOutput.add(new Watermark(7000));
-		expectedOutput.add(new Watermark(8000));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
-			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
-		} else {
-			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-		}
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testCountTrigger() throws Exception {
-		closeCalled.set(0);
-
-		final int WINDOW_SIZE = 4;
-
-		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
-				GlobalWindows.create(),
-				new GlobalWindow.Serializer(),
-				windowBufferFactory,
-				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
-				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
-				"Tuple2<String, Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		// The global window actually ignores these timestamps...
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
-			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
-		} else {
-			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
-		}
-
-	}
-
-	// ------------------------------------------------------------------------
-	//  UDFs
-	// ------------------------------------------------------------------------
-
-	public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private boolean openCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			closeCalled.incrementAndGet();
-		}
-
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called");
-			}
-			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
-		}
-	}
-	// ------------------------------------------------------------------------
-	//  Parametrization for testing different window buffers
-	// ------------------------------------------------------------------------
-
-	@Parameterized.Parameters(name = "WindowBuffer = {0}")
-	@SuppressWarnings("unchecked,rawtypes")
-	public static Collection<WindowBufferFactory[]> windowBuffers(){
-		return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new SumReducer())},
-				new WindowBufferFactory[]{new HeapWindowBuffer.Factory()}
-				);
-	}
-
-	@SuppressWarnings("unchecked")
-	private static class ResultSortComparator implements Comparator<Object> {
-		@Override
-		public int compare(Object o1, Object o2) {
-			if (o1 instanceof Watermark || o2 instanceof Watermark) {
-				return 0;
-			} else {
-				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
-				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
-				if (sr0.getTimestamp() != sr1.getTimestamp()) {
-					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
-				}
-				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
-				if (comparison != 0) {
-					return comparison;
-				} else {
-					return sr0.getValue().f1 - sr1.getValue().f1;
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
deleted file mode 100644
index 76c6f20..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.WindowedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
- */
-public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
-	/**
-	 * These tests ensure that the fast aligned time windows operator is used if the
-	 * conditions are right.
-	 */
-	@Test
-	public void testFastTimeWindows() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(0)
-				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.keyBy(0)
-				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(Tuple tuple,
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
-	}
-
-	/**
-	 * These tests ensure that the fast aligned time windows operator is used if the
-	 * conditions are right.
-	 *
-	 * TODO: update once the fast aligned time windows operator is in
-	 */
-	@Ignore
-	@Test
-	public void testNonParallelFastTimeWindows() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS),
-						Time.of(100, TimeUnit.MILLISECONDS))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS))
-				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
-	}
-
-	// ------------------------------------------------------------------------
-	//  UDFs
-	// ------------------------------------------------------------------------
-
-	public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
-			return value1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
deleted file mode 100644
index fb7142b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
- * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
- */
-public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
-
-	private static List<String> testResults;
-
-	@Test
-	public void testFoldWindow() throws Exception {
-
-		testResults = Lists.newArrayList();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(1);
-
-		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
-				ctx.collect(Tuple2.of("a", 0));
-				ctx.collect(Tuple2.of("a", 1));
-				ctx.collect(Tuple2.of("a", 2));
-
-				ctx.collect(Tuple2.of("b", 3));
-				ctx.collect(Tuple2.of("b", 4));
-				ctx.collect(Tuple2.of("b", 5));
-
-				ctx.collect(Tuple2.of("a", 6));
-				ctx.collect(Tuple2.of("a", 7));
-				ctx.collect(Tuple2.of("a", 8));
-			}
-
-			@Override
-			public void cancel() {
-			}
-		}).assignTimestamps(new Tuple2TimestampExtractor());
-
-		source1
-				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-				.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-					@Override
-					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,
-							Tuple2<String, Integer> value) throws Exception {
-						accumulator.f0 += value.f0;
-						accumulator.f1 += value.f1;
-						return accumulator;
-					}
-				})
-				.addSink(new SinkFunction<Tuple2<String, Integer>>() {
-					@Override
-					public void invoke(Tuple2<String, Integer> value) throws Exception {
-						testResults.add(value.toString());
-					}
-				});
-
-		env.execute("Fold Window Test");
-
-		List<String> expectedResult = Lists.newArrayList(
-				"(R:aaa,3)",
-				"(R:aaa,21)",
-				"(R:bbb,12)");
-
-		Collections.sort(expectedResult);
-		Collections.sort(testResults);
-
-		Assert.assertEquals(expectedResult, testResults);
-	}
-
-	@Test
-	public void testFoldAllWindow() throws Exception {
-
-		testResults = Lists.newArrayList();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(1);
-
-		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
-				ctx.collect(Tuple2.of("a", 0));
-				ctx.collect(Tuple2.of("a", 1));
-				ctx.collect(Tuple2.of("a", 2));
-
-				ctx.collect(Tuple2.of("b", 3));
-				ctx.collect(Tuple2.of("a", 3));
-				ctx.collect(Tuple2.of("b", 4));
-				ctx.collect(Tuple2.of("a", 4));
-				ctx.collect(Tuple2.of("b", 5));
-				ctx.collect(Tuple2.of("a", 5));
-
-			}
-
-			@Override
-			public void cancel() {
-			}
-		}).assignTimestamps(new Tuple2TimestampExtractor());
-
-		source1
-				.windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-				.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-					@Override
-					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,
-							Tuple2<String, Integer> value) throws Exception {
-						accumulator.f0 += value.f0;
-						accumulator.f1 += value.f1;
-						return accumulator;
-					}
-				})
-				.addSink(new SinkFunction<Tuple2<String, Integer>>() {
-					@Override
-					public void invoke(Tuple2<String, Integer> value) throws Exception {
-						testResults.add(value.toString());
-					}
-				});
-
-		env.execute("Fold All-Window Test");
-
-		List<String> expectedResult = Lists.newArrayList(
-				"(R:aaa,3)",
-				"(R:bababa,24)");
-
-		Collections.sort(expectedResult);
-		Collections.sort(testResults);
-
-		Assert.assertEquals(expectedResult, testResults);
-	}
-
-	private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) {
-			return element.f1;
-		}
-
-		@Override
-		public long extractWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
-			return element.f1 - 1;
-		}
-
-		@Override
-		public long getCurrentWatermark() {
-			return Long.MIN_VALUE;
-		}
-	}
-}


Mime
View raw message