flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [06/19] flink git commit: [streaming] Major internal renaming and restructure
Date Wed, 15 Apr 2015 09:38:47 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
deleted file mode 100644
index 7049cb4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class GroupedStreamDiscretizerTest {
-
-	KeySelector<Tuple2<Integer, String>, ?> keySelector = new KeySelector<Tuple2<Integer, String>, String>() {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<Integer, String> value) throws Exception {
-			return value.f1;
-		}
-	};
-
-	/**
-	 * Test for not active distributed triggers with single field
-	 */
-	@Test
-	public void groupedDiscretizerTest() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-		inputs.add(5);
-		inputs.add(10);
-		inputs.add(11);
-		inputs.add(11);
-
-		Set<StreamWindow<Integer>> expected = new HashSet<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(2, 2));
-		expected.add(StreamWindow.fromElements(1, 3));
-		expected.add(StreamWindow.fromElements(5, 11));
-		expected.add(StreamWindow.fromElements(4, 10));
-		expected.add(StreamWindow.fromElements(11));
-
-		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) {
-				return value % 2;
-			}
-		};
-
-		CloneableTriggerPolicy<Integer> trigger = new CountTriggerPolicy<Integer>(2);
-		CloneableEvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
-
-		GroupedStreamDiscretizer<Integer> discretizer = new GroupedStreamDiscretizer<Integer>(
-				keySelector, trigger, eviction);
-
-		WindowBufferInvokable<Integer> buffer = new GroupedWindowBufferInvokable<Integer>(
-				new BasicWindowBuffer<Integer>(), keySelector);
-
-		List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer,
-				inputs);
-		List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
-
-		assertEquals(expected, new HashSet<StreamWindow<Integer>>(result));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
deleted file mode 100644
index d892c48..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
-import org.junit.Test;
-
-public class ParallelMergeTest {
-
-	@Test
-	public void nonGroupedTest() throws Exception {
-
-		ReduceFunction<Integer> reducer = new ReduceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer reduce(Integer a, Integer b) throws Exception {
-				return a + b;
-			}
-		};
-
-		TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>();
-		List<StreamWindow<Integer>> output = out.getCollected();
-
-		ParallelMerge<Integer> merger = new ParallelMerge<Integer>(reducer);
-		merger.numberOfDiscretizers = 2;
-
-		merger.flatMap1(createTestWindow(1), out);
-		merger.flatMap1(createTestWindow(1), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		assertTrue(output.isEmpty());
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		assertEquals(StreamWindow.fromElements(2), output.get(0));
-
-		merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), out);
-		merger.flatMap1(createTestWindow(2), out);
-		merger.flatMap1(createTestWindow(2), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), out);
-		assertEquals(1, output.size());
-		merger.flatMap1(createTestWindow(2), out);
-		assertEquals(StreamWindow.fromElements(3), output.get(1));
-
-		// check error handling
-		merger.flatMap1(createTestWindow(3), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out);
-
-		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out);
-		merger.flatMap1(createTestWindow(4), out);
-		try {
-			merger.flatMap1(createTestWindow(4), out);
-			fail();
-		} catch (RuntimeException e) {
-			// Do nothing
-		}
-
-		ParallelMerge<Integer> merger2 = new ParallelMerge<Integer>(reducer);
-		merger2.numberOfDiscretizers = 2;
-		merger2.flatMap1(createTestWindow(0), out);
-		merger2.flatMap1(createTestWindow(1), out);
-		merger2.flatMap1(createTestWindow(1), out);
-		merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		try {
-			merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-			fail();
-		} catch (RuntimeException e) {
-			// Do nothing
-		}
-
-	}
-
-	@Test
-	public void groupedTest() throws Exception {
-
-		TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>();
-		List<StreamWindow<Integer>> output = out.getCollected();
-
-		ParallelMerge<Integer> merger = new ParallelGroupedMerge<Integer>();
-		merger.numberOfDiscretizers = 2;
-
-		merger.flatMap1(createTestWindow(1), out);
-		merger.flatMap1(createTestWindow(1), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		assertTrue(output.isEmpty());
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		assertEquals(StreamWindow.fromElements(1, 1), output.get(0));
-	}
-
-	private StreamWindow<Integer> createTestWindow(Integer id) {
-		StreamWindow<Integer> ret = new StreamWindow<Integer>(id);
-		ret.add(1);
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
deleted file mode 100644
index bd81c20..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizerTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class StreamDiscretizerTest {
-
-	/**
-	 * Test case equal to {@link WindowReduceInvokableTest}
-	 */
-	@Test
-	public void testWindowInvokableWithTimePolicy() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-		inputs.add(5);
-		inputs.add(10);
-		inputs.add(11);
-		inputs.add(11);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2, 2, 3, 4));
-		expected.add(StreamWindow.fromElements(3, 4, 5));
-		expected.add(StreamWindow.fromElements(5));
-		expected.add(StreamWindow.fromElements(10));
-		expected.add(StreamWindow.fromElements(10, 11, 11));
-
-		Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-		};
-
-		TriggerPolicy<Integer> trigger = new TimeTriggerPolicy<Integer>(2L,
-				new TimestampWrapper<Integer>(myTimeStamp, 3));
-
-		EvictionPolicy<Integer> eviction = new TimeEvictionPolicy<Integer>(4L,
-				new TimestampWrapper<Integer>(myTimeStamp, 1));
-		
-		
-
-		StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction);
-		WindowBufferInvokable<Integer> buffer = new WindowBufferInvokable<Integer>(new BasicWindowBuffer<Integer>());
-
-		List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer, inputs);
-		List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
-		
-		assertEquals(expected, result);
-	}
-
-	@Test
-	public void testWindowInvokableWithCountPolicy() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(1, 2));
-		expected.add(StreamWindow.fromElements(2, 3));
-		expected.add(StreamWindow.fromElements(4));
-
-		TriggerPolicy<Integer> trigger = new CountTriggerPolicy<Integer>(2);
-
-		EvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
-
-		StreamDiscretizer<Integer> discretizer = new StreamDiscretizer<Integer>(trigger, eviction);
-		WindowBufferInvokable<Integer> buffer = new WindowBufferInvokable<Integer>(new BasicWindowBuffer<Integer>());
-
-		List<WindowEvent<Integer>> bufferEvents = MockContext.createAndExecute(discretizer, inputs);
-		List<StreamWindow<Integer>> result = MockContext.createAndExecute(buffer, bufferEvents);
-		assertEquals(expected, result);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattenerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattenerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattenerTest.java
deleted file mode 100644
index bee6733..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattenerTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowFlattenerTest {
-
-	@Test
-	public void test() {
-		StreamInvokable<StreamWindow<Integer>, Integer> flattener = new WindowFlattener<Integer>();
-
-		StreamWindow<Integer> w1 = StreamWindow.fromElements(1, 2, 3);
-		StreamWindow<Integer> w2 = new StreamWindow<Integer>();
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(w1);
-		input.add(w2);
-
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.addAll(w1);
-		expected.addAll(w2);
-
-		List<Integer> output = MockContext.createAndExecute(flattener, input);
-
-		assertEquals(expected, output);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
deleted file mode 100644
index ccc01e4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowFolderTest {
-
-	@Test
-	public void test() {
-		StreamInvokable<StreamWindow<Integer>, StreamWindow<String>> windowReducer = new WindowFolder<Integer,String>(
-				new FoldFunction<Integer, String>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public String fold(String accumulator, Integer value) throws Exception {
-						return accumulator + value.toString();
-					}
-				}, "");
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(StreamWindow.fromElements(1, 2, 3));
-		input.add(new StreamWindow<Integer>());
-		input.add(StreamWindow.fromElements(-1));
-
-		List<StreamWindow<String>> expected = new ArrayList<StreamWindow<String>>();
-		expected.add(StreamWindow.fromElements("123"));
-		expected.add(new StreamWindow<String>());
-		expected.add(StreamWindow.fromElements("-1"));
-
-		List<StreamWindow<String>> output = MockContext.createAndExecute(windowReducer, input);
-
-		assertEquals(expected, output);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
deleted file mode 100644
index f0b5500..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.WindowMapFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowIntegrationTest implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private static final Integer MEMORYSIZE = 32;
-
-	@SuppressWarnings("serial")
-	public static class ModKey implements KeySelector<Integer, Integer> {
-		private int m;
-
-		public ModKey(int m) {
-			this.m = m;
-		}
-
-		@Override
-		public Integer getKey(Integer value) throws Exception {
-			return value % m;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static class IdentityWindowMap implements
-			WindowMapFunction<Integer, StreamWindow<Integer>> {
-
-		@Override
-		public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out)
-				throws Exception {
-
-			StreamWindow<Integer> window = new StreamWindow<Integer>();
-
-			for (Integer value : values) {
-				window.add(value);
-			}
-			out.collect(window);
-		}
-
-	}
-
-	@Test
-	public void test() throws Exception {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-		inputs.add(5);
-		inputs.add(10);
-		inputs.add(11);
-		inputs.add(11);
-
-		KeySelector<Integer, ?> key = new ModKey(2);
-
-		Timestamp<Integer> ts = new Timestamp<Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Integer value) {
-				return value;
-			}
-		};
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
-
-		DataStream<Integer> source = env.fromCollection(inputs);
-
-		source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new CentralSink1());
-
-		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
-				.flatten().addSink(new CentralSink2());
-
-		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new DistributedSink1());
-
-		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
-				.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
-
-		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
-				.addSink(new CentralSink3());
-
-		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
-				.addSink(new DistributedSink3());
-
-		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
-				.addSink(new DistributedSink4());
-
-		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
-				.getDiscretizedStream().addSink(new DistributedSink5());
-
-		DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(Collector<Integer> collector) throws Exception {
-				for (int i = 1; i <= 10; i++) {
-					collector.collect(i);
-				}
-			}
-
-			@Override
-			public void cancel() {
-			}
-		});
-
-		DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(Collector<Integer> collector) throws Exception {
-				for (int i = 1; i <= 11; i++) {
-					if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
-						collector.collect(i);
-					}
-				}
-			}
-
-			@Override
-			public void cancel() {
-			}
-		});
-
-		source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
-				.addSink(new DistributedSink6());
-
-		source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
-				.addSink(new DistributedSink7());
-
-		env.execute();
-
-		// sum ( Time of 3 slide 2 )
-		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.add(StreamWindow.fromElements(5));
-		expected1.add(StreamWindow.fromElements(11));
-		expected1.add(StreamWindow.fromElements(9));
-		expected1.add(StreamWindow.fromElements(10));
-		expected1.add(StreamWindow.fromElements(32));
-
-		validateOutput(expected1, CentralSink1.windows);
-
-		// Tumbling Time of 4 grouped by mod 2
-		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.add(StreamWindow.fromElements(2, 2, 4));
-		expected2.add(StreamWindow.fromElements(1, 3));
-		expected2.add(StreamWindow.fromElements(5));
-		expected2.add(StreamWindow.fromElements(10));
-		expected2.add(StreamWindow.fromElements(11, 11));
-
-		validateOutput(expected2, CentralSink2.windows);
-
-		// groupby mod 2 sum ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
-		expected3.add(StreamWindow.fromElements(4));
-		expected3.add(StreamWindow.fromElements(5));
-		expected3.add(StreamWindow.fromElements(22));
-		expected3.add(StreamWindow.fromElements(8));
-		expected3.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected3, DistributedSink1.windows);
-
-		// groupby mod3 Tumbling Count of 2 grouped by mod 2
-		List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
-		expected4.add(StreamWindow.fromElements(2, 2));
-		expected4.add(StreamWindow.fromElements(1));
-		expected4.add(StreamWindow.fromElements(4));
-		expected4.add(StreamWindow.fromElements(5, 11));
-		expected4.add(StreamWindow.fromElements(10));
-		expected4.add(StreamWindow.fromElements(11));
-		expected4.add(StreamWindow.fromElements(3));
-
-		validateOutput(expected4, DistributedSink2.windows);
-
-		// min ( Time of 2 slide 3 )
-		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
-		expected5.add(StreamWindow.fromElements(1));
-		expected5.add(StreamWindow.fromElements(4));
-		expected5.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected5, CentralSink3.windows);
-
-		// groupby mod 2 max ( Tumbling Time of 4)
-		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
-		expected6.add(StreamWindow.fromElements(3));
-		expected6.add(StreamWindow.fromElements(5));
-		expected6.add(StreamWindow.fromElements(11));
-		expected6.add(StreamWindow.fromElements(4));
-		expected6.add(StreamWindow.fromElements(10));
-
-		validateOutput(expected6, DistributedSink3.windows);
-
-		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
-		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
-		expected7.add(StreamWindow.fromElements(10));
-		expected7.add(StreamWindow.fromElements(10, 11, 11));
-
-		validateOutput(expected7, DistributedSink4.windows);
-
-		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
-		expected8.add(StreamWindow.fromElements(4, 8));
-		expected8.add(StreamWindow.fromElements(4, 5));
-		expected8.add(StreamWindow.fromElements(10, 22));
-
-		for (List<Integer> sw : DistributedSink5.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected8, DistributedSink5.windows);
-
-		List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
-		expected9.add(StreamWindow.fromElements(6));
-		expected9.add(StreamWindow.fromElements(14));
-		expected9.add(StreamWindow.fromElements(22));
-		expected9.add(StreamWindow.fromElements(30));
-		expected9.add(StreamWindow.fromElements(38));
-
-		validateOutput(expected9, DistributedSink6.windows);
-
-		List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
-		expected10.add(StreamWindow.fromElements(6, 9));
-		expected10.add(StreamWindow.fromElements(16, 24));
-
-		for (List<Integer> sw : DistributedSink7.windows) {
-			Collections.sort(sw);
-		}
-
-		validateOutput(expected10, DistributedSink7.windows);
-
-	}
-
-	public static <R> void validateOutput(List<R> expected, List<R> actual) {
-		assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
-	}
-
-	@SuppressWarnings("serial")
-	private static class CentralSink1 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class CentralSink2 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class CentralSink3 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class DistributedSink1 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class DistributedSink2 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class DistributedSink3 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class DistributedSink4 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class DistributedSink5 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class DistributedSink6 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-	@SuppressWarnings("serial")
-	private static class DistributedSink7 implements SinkFunction<StreamWindow<Integer>> {
-
-		public static List<StreamWindow<Integer>> windows = Collections
-				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
-		@Override
-		public void invoke(StreamWindow<Integer> value) throws Exception {
-			windows.add(value);
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapperTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapperTest.java
deleted file mode 100644
index c4022ea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapperTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.WindowMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowMapperTest {
-
-	@Test
-	public void test() {
-		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> windowMapper = new WindowMapper<Integer, Integer>(
-				new WindowMapFunction<Integer, Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void mapWindow(Iterable<Integer> values, Collector<Integer> out)
-							throws Exception {
-						for (Integer v : values) {
-							out.collect(v);
-						}
-					}
-				});
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(StreamWindow.fromElements(1, 2, 3));
-		input.add(new StreamWindow<Integer>());
-
-		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMapper, input);
-
-		assertEquals(input, output);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
deleted file mode 100644
index 77037d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowMergerTest {
-
-	@Test
-	public void test() throws Exception {
-		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> windowMerger = new WindowMerger<Integer>();
-
-		StreamWindow<Integer> w1 = new StreamWindow<Integer>();
-		StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
-		StreamWindow<Integer> w3 = StreamWindow.fromElements(-1, 2, 3, 4);
-		StreamWindow<Integer> w4_1 = new StreamWindow<Integer>(1, 2);
-		StreamWindow<Integer> w4_2 = new StreamWindow<Integer>(1, 2);
-		w4_1.add(1);
-		w4_2.add(2);
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(w1);
-		expected.add(w2);
-		expected.add(w3);
-		expected.add(StreamWindow.fromElements(1, 2));
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(w1);
-		input.add(w4_1);
-		input.addAll(StreamWindow.split(w2, 2));
-		input.addAll(StreamWindow.partitionBy(w3, new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value % 2;
-			}
-		}, false));
-		input.add(w4_2);
-
-		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMerger, input);
-
-		assertEquals(expected.size(), expected.size());
-		for (int i = 0; i < output.size(); i++) {
-			assertEquals(new HashSet<Integer>(expected.get(i)), new HashSet<Integer>(output.get(i)));
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
deleted file mode 100644
index 9a2416c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowPartitionerTest {
-
-	@Test
-	public void test() throws Exception {
-		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> splitPartitioner = new WindowPartitioner<Integer>(
-				2);
-
-		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> gbPartitioner = new WindowPartitioner<Integer>(
-				new MyKey());
-
-		StreamWindow<Integer> w1 = new StreamWindow<Integer>();
-		StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
-
-		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.addAll(StreamWindow.split(w1,2));
-		expected1.addAll(StreamWindow.split(w2,2));
-
-		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.addAll(StreamWindow.partitionBy(w1,new MyKey(),false));
-		expected2.addAll(StreamWindow.partitionBy(w2,new MyKey(),false));
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(w1);
-		input.add(w2);
-
-		List<StreamWindow<Integer>> output1 = MockContext.createAndExecute(splitPartitioner, input);
-		List<StreamWindow<Integer>> output2 = MockContext.createAndExecute(gbPartitioner, input);
-
-		assertEquals(expected1, output1);
-		assertEquals(expected2, output2);
-
-	}
-
-	private static class MyKey implements KeySelector<Integer, Object> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object getKey(Integer value) throws Exception {
-			return value / 2;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducerTest.java
deleted file mode 100644
index e3d742c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducerTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class WindowReducerTest {
-
-	@Test
-	public void test() {
-		StreamInvokable<StreamWindow<Integer>, StreamWindow<Integer>> windowReducer = new WindowReducer<Integer>(
-				new ReduceFunction<Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer reduce(Integer value1, Integer value2) throws Exception {
-						return value1 + value2;
-					}
-				});
-
-		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
-		input.add(StreamWindow.fromElements(1, 2, 3));
-		input.add(new StreamWindow<Integer>());
-		input.add(StreamWindow.fromElements(-1));
-
-		List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
-		expected.add(StreamWindow.fromElements(6));
-		expected.add(new StreamWindow<Integer>());
-		expected.add(StreamWindow.fromElements(-1));
-
-		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowReducer, input);
-
-		assertEquals(expected, output);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/CounterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/CounterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/CounterTest.java
new file mode 100644
index 0000000..dbbde29
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/CounterTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.operators.StreamCounter;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class CounterTest {
+
+	@Test
+	public void counterTest() {
+		StreamCounter<String> operator = new StreamCounter<String>();
+
+		List<Long> expected = Arrays.asList(1L, 2L, 3L);
+		List<Long> actual = MockContext.createAndExecute(operator, Arrays.asList("one", "two", "three"));
+		
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FilterTest.java
new file mode 100644
index 0000000..ebde006
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FilterTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class FilterTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	static class MyFilter implements FilterFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Integer value) throws Exception {
+			return value % 2 == 0;
+		}
+	}
+
+	@Test 
+	public void test() {
+		StreamFilter<Integer> operator = new StreamFilter<Integer>(new MyFilter());
+
+		List<Integer> expected = Arrays.asList(2, 4, 6);
+		List<Integer> actual = MockContext.createAndExecute(operator, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+		
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FlatMapTest.java
new file mode 100644
index 0000000..7f914dd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FlatMapTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.operators.StreamFlatMap;
+import org.apache.flink.streaming.util.MockContext;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class FlatMapTest {
+
+	public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+			if (value % 2 == 0) {
+				out.collect(value);
+				out.collect(value * value);
+			}
+		}
+	}
+
+	@Test
+	public void flatMapTest() {
+		StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new MyFlatMap());
+		
+		List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64);
+		List<Integer> actual = MockContext.createAndExecute(operator, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
+		
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedFoldTest.java
new file mode 100644
index 0000000..7a45035
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedFoldTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.operators.StreamGroupedFold;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class GroupedFoldTest {
+
+	private static class MyFolder implements FoldFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String fold(String accumulator, Integer value) throws Exception {
+			return accumulator + value.toString();
+		}
+
+	}
+
+	@Test
+	public void test() {
+		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
+
+		StreamGroupedFold<Integer, String> operator1 = new StreamGroupedFold<Integer, String>(
+				new MyFolder(), new KeySelector<Integer, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String getKey(Integer value) throws Exception {
+				return value.toString();
+			}
+		}, "100", outType);
+
+		List<String> expected = Arrays.asList("1001","10011", "1002", "10022", "1003");
+		List<String> actual = MockContext.createAndExecute(operator1,
+				Arrays.asList(1, 1, 2, 2, 3));
+
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
new file mode 100644
index 0000000..faaadbc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class GroupedReduceTest {
+
+	private static class MyReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+
+	@Test
+	public void test() {
+		StreamGroupedReduce<Integer> operator1 = new StreamGroupedReduce<Integer>(
+				new MyReducer(), new KeySelector<Integer, Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				});
+
+		List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
+		List<Integer> actual = MockContext.createAndExecute(operator1,
+				Arrays.asList(1, 1, 2, 2, 3));
+
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/MapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/MapTest.java
new file mode 100644
index 0000000..394b5a4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/MapTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class MapTest {
+
+	private static class Map implements MapFunction<Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map(Integer value) throws Exception {
+			return "+" + (value + 1);
+		}
+	}
+	
+	@Test
+	public void mapTest() {
+		StreamMap<Integer, String> operator = new StreamMap<Integer, String>(new Map());
+		
+		List<String> expectedList = Arrays.asList("+2", "+3", "+4");
+		List<String> actualList = MockContext.createAndExecute(operator, Arrays.asList(1, 2, 3));
+		
+		assertEquals(expectedList, actualList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
new file mode 100644
index 0000000..2cd3558
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.StreamProjection;
+import org.apache.flink.streaming.api.operators.StreamProject;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class ProjectTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	@Test
+	public void test() {
+
+		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
+				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
+						4));
+
+		int[] fields = new int[] { 4, 4, 3 };
+		Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
+
+		@SuppressWarnings("unchecked")
+		StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator = new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
+				fields,
+				(TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection
+						.extractFieldTypes(fields, classes, inType));
+
+		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7));
+
+		List<Tuple3<Integer, Integer, String>> expected = new ArrayList<Tuple3<Integer, Integer, String>>();
+		expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b"));
+		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+		expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
+
+		assertEquals(expected, MockContext.createAndExecute(operator, input));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFoldTest.java
new file mode 100644
index 0000000..b3307cb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFoldTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.operators.StreamFold;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class StreamFoldTest {
+
+	private static class MyFolder implements FoldFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String fold(String accumulator, Integer value) throws Exception {
+			return accumulator + value.toString();
+		}
+	}
+
+	@Test
+	public void test() {
+		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
+		StreamFold<Integer, String> operator1 = new StreamFold<Integer, String>(
+				new MyFolder(), "", outType);
+
+		List<String> expected = Arrays.asList("1","11","112","1123","11233");
+		List<String> actual = MockContext.createAndExecute(operator1,
+				Arrays.asList(1, 1, 2, 3, 3));
+
+		assertEquals(expected, actual);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamReduceTest.java
new file mode 100644
index 0000000..0c18d8d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamReduceTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.operators.StreamReduce;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class StreamReduceTest {
+
+	private static class MyReducer implements ReduceFunction<Integer>{
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1+value2;
+		}
+		
+	}
+	
+	@Test
+	public void test() {
+		StreamReduce<Integer> operator1 = new StreamReduce<Integer>(
+				new MyReducer());
+
+		List<Integer> expected = Arrays.asList(1,2,4,7,10);
+		List<Integer> actual = MockContext.createAndExecute(operator1,
+				Arrays.asList(1, 1, 2, 3, 3));
+
+		assertEquals(expected, actual);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
new file mode 100644
index 0000000..99cc62f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoFlatMapTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap1(String value, Collector<String> coll) {
+			for (int i = 0; i < value.length(); i++) {
+				coll.collect(value.substring(i, i + 1));
+			}
+		}
+
+		@Override
+		public void flatMap2(Integer value, Collector<String> coll) {
+			coll.collect(value.toString());
+		}
+	}
+
+	@Test
+	public void coFlatMapTest() {
+		CoStreamFlatMap<String, Integer, String> invokable = new CoStreamFlatMap<String, Integer, String>(
+				new MyCoFlatMap());
+
+		List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
+				"e", "3", "4", "5");
+		List<String> actualList = MockCoContext.createAndExecute(invokable,
+				Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
+
+		assertEquals(expectedList, actualList);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void multipleInputTest() {
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
+		DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
+		
+		try {
+			ds1.forward().merge(ds2);
+			fail();
+		} catch (RuntimeException e) {
+			// expected
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
new file mode 100644
index 0000000..d01d0d3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.junit.Test;
+
+public class CoGroupedReduceTest {
+
+	private final static class MyCoReduceFunction implements
+			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
+				Tuple3<String, String, String> value2) {
+			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
+		}
+
+		@Override
+		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
+				Tuple2<Integer, Integer> value2) {
+			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+		}
+
+		@Override
+		public String map1(Tuple3<String, String, String> value) {
+			return value.f1;
+		}
+
+		@Override
+		public String map2(Tuple2<Integer, Integer> value) {
+			return value.f1.toString();
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void coGroupedReduceTest() {
+		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
+		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
+		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
+		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
+		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
+		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
+		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
+		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
+
+		KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String getKey(Tuple3<String, String, String> value) throws Exception {
+				return value.f0;
+			}
+		};
+
+		KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
+				return value.f0;
+			}
+		};
+
+		KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String getKey(Tuple3<String, String, String> value) throws Exception {
+				return value.f2;
+			}
+		};
+
+		CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+				new MyCoReduceFunction(), keySelector0, keySelector1);
+
+		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
+				"7");
+
+		List<String> actualList = MockCoContext.createAndExecute(invokable,
+				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+
+		assertEquals(expected, actualList);
+
+		invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+				new MyCoReduceFunction(), keySelector2, keySelector1);
+
+		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
+
+		actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3),
+				Arrays.asList(int1, int2, int3, int4, int5));
+
+		assertEquals(expected, actualList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java
new file mode 100644
index 0000000..2a2560d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.junit.Test;
+
+public class CoMapTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(Double value) {
+			return value.toString();
+		}
+
+		@Override
+		public String map2(Integer value) {
+			return value.toString();
+		}
+	}
+
+	@Test
+	public void coMapTest() {
+		CoStreamMap<Double, Integer, String> invokable = new CoStreamMap<Double, Integer, String>(new MyCoMap());
+
+		List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
+		List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
+		
+		assertEquals(expectedList, actualList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamReduceTest.java
new file mode 100644
index 0000000..8a769ef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamReduceTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+import org.apache.flink.streaming.api.operators.co.CoStreamReduce;
+import org.apache.flink.streaming.util.MockCoContext;
+import org.junit.Test;
+
+public class CoStreamReduceTest {
+
+	public static class MyCoReduceFunction implements
+			CoReduceFunction<Integer, String, Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce1(Integer value1, Integer value2) {
+			return value1 * value2;
+		}
+
+		@Override
+		public String reduce2(String value1, String value2) {
+			return value1 + value2;
+		}
+
+		@Override
+		public Integer map1(Integer value) {
+			return value;
+		}
+
+		@Override
+		public Integer map2(String value) {
+			return Integer.parseInt(value);
+		}
+
+	}
+
+	@Test
+	public void coStreamReduceTest() {
+
+		CoStreamReduce<Integer, String, Integer> coReduce = new CoStreamReduce<Integer, String, Integer>(
+				new MyCoReduceFunction());
+
+		List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
+		List<Integer> result = MockCoContext.createAndExecute(coReduce,
+				Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8"));
+
+		assertEquals(expected1, result);
+
+	}
+}


Mime
View raw message