flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [07/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:41:45 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
new file mode 100644
index 0000000..403dd17
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class FilterTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	static class MyFilter implements FilterFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Integer value) throws Exception {
+			return value % 2 == 0;
+		}
+	}
+
+	@Test 
+	public void test() {
+		FilterInvokable<Integer> invokable = new FilterInvokable<Integer>(new MyFilter());
+
+		List<Integer> expected = Arrays.asList(2, 4, 6);
+		List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+		
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
new file mode 100644
index 0000000..7424e21
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class FlatMapTest {
+
+	public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+			if (value % 2 == 0) {
+				out.collect(value);
+				out.collect(value * value);
+			}
+		}
+	}
+
+	@Test
+	public void flatMapTest() {
+		FlatMapInvokable<Integer, Integer> invokable = new FlatMapInvokable<Integer, Integer>(new MyFlatMap());
+		
+		List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64);
+		List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
+		
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
new file mode 100755
index 0000000..ce47c67
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class GroupedReduceInvokableTest {
+
+	private static class MyReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+
+	@Test
+	public void test() {
+		GroupedReduceInvokable<Integer> invokable1 = new GroupedReduceInvokable<Integer>(
+				new MyReducer(), new KeySelector<Integer, Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				});
+
+		List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
+		List<Integer> actual = MockContext.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 2, 3));
+
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
new file mode 100644
index 0000000..f38d5c1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class GroupedWindowInvokableTest {
+
+	KeySelector<Tuple2<Integer, String>, ?> keySelector = new KeySelector<Tuple2<Integer, String>, String>() {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<Integer, String> value) throws Exception {
+			return value.f1;
+		}
+	};
+
+	/**
+	 * Tests that illegal arguments result in failure. The following cases are
+	 * tested: 1) having no trigger 2) having no eviction 3) having neither
+	 * eviction nor trigger 4) having both, central and distributed eviction.
+	 */
+	@Test
+	public void testGroupedWindowInvokableFailTest() {
+
+		// create dummy reduce function
+		ReduceFunction<Object> userFunction = new ReduceFunction<Object>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Object reduce(Object value1, Object value2) throws Exception {
+				return null;
+			}
+		};
+
+		// create dummy keySelector
+		KeySelector<Object, Object> keySelector = new KeySelector<Object, Object>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Object getKey(Object value) throws Exception {
+				return null;
+			}
+		};
+
+		// create policy lists
+		LinkedList<CloneableEvictionPolicy<Object>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<Object>>();
+		LinkedList<CloneableTriggerPolicy<Object>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<Object>>();
+		LinkedList<EvictionPolicy<Object>> centralEvictionPolicies = new LinkedList<EvictionPolicy<Object>>();
+		LinkedList<TriggerPolicy<Object>> centralTriggerPolicies = new LinkedList<TriggerPolicy<Object>>();
+
+		// empty trigger and policy lists should fail
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+					distributedTriggerPolicies, distributedEvictionPolicies,
+					centralTriggerPolicies, centralEvictionPolicies);
+			fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (1)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+		// null for trigger and policy lists should fail
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector, null, null, null,
+					null);
+			fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (2)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+		// empty eviction should still fail
+		centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+		distributedTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+					distributedTriggerPolicies, distributedEvictionPolicies,
+					centralTriggerPolicies, centralEvictionPolicies);
+			fail("Creating instance without any eviction policy should cause an UnsupportedOperationException but didn't. (3)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+		// empty trigger should still fail
+		centralTriggerPolicies.clear();
+		distributedTriggerPolicies.clear();
+		centralEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+					distributedTriggerPolicies, distributedEvictionPolicies,
+					centralTriggerPolicies, centralEvictionPolicies);
+			fail("Creating instance without any trigger policy should cause an UnsupportedOperationException but didn't. (4)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+		// having both, central and distributed eviction, at the same time
+		// should fail
+		centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+		distributedEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+					distributedTriggerPolicies, distributedEvictionPolicies,
+					centralTriggerPolicies, centralEvictionPolicies);
+			fail("Creating instance with central and distributed eviction should cause an UnsupportedOperationException but didn't. (4)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+	}
+
+	/**
+	 * Test for not active distributed triggers with single field
+	 */
+	@Test
+	public void testGroupedWindowInvokableDistributedTriggerSimple() {
+		List<Integer> inputs = new ArrayList<Integer>();
+		inputs.add(1);
+		inputs.add(1);
+		inputs.add(5);
+		inputs.add(5);
+		inputs.add(5);
+		inputs.add(1);
+		inputs.add(1);
+		inputs.add(5);
+		inputs.add(1);
+		inputs.add(5);
+
+		List<Integer> expectedDistributedEviction = new ArrayList<Integer>();
+		expectedDistributedEviction.add(15);
+		expectedDistributedEviction.add(3);
+		expectedDistributedEviction.add(3);
+		expectedDistributedEviction.add(15);
+
+		List<Integer> expectedCentralEviction = new ArrayList<Integer>();
+		expectedCentralEviction.add(2);
+		expectedCentralEviction.add(5);
+		expectedCentralEviction.add(15);
+		expectedCentralEviction.add(2);
+		expectedCentralEviction.add(5);
+		expectedCentralEviction.add(2);
+		expectedCentralEviction.add(5);
+		expectedCentralEviction.add(1);
+		expectedCentralEviction.add(5);
+
+		LinkedList<CloneableTriggerPolicy<Integer>> triggers = new LinkedList<CloneableTriggerPolicy<Integer>>();
+		// Trigger on every 2nd element, but the first time after the 3rd
+		triggers.add(new CountTriggerPolicy<Integer>(2, -1));
+
+		LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>();
+		// On every 2nd element, remove the oldest 2 elements, but the first
+		// time after the 3rd element
+		evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
+
+		LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>();
+
+		ReduceFunction<Integer> reduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) {
+				return value;
+			}
+		};
+
+		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
+				reduceFunction, keySelector, triggers, evictions, centralTriggers, null);
+
+		List<Integer> result = MockContext.createAndExecute(invokable, inputs);
+
+		List<Integer> actual = new LinkedList<Integer>();
+		for (Integer current : result) {
+			actual.add(current);
+		}
+
+		assertEquals(new HashSet<Integer>(expectedDistributedEviction),
+				new HashSet<Integer>(actual));
+		assertEquals(expectedDistributedEviction.size(), actual.size());
+
+		// Run test with central eviction
+		triggers.clear();
+		centralTriggers.add(new CountTriggerPolicy<Integer>(2, -1));
+		LinkedList<EvictionPolicy<Integer>> centralEvictions = new LinkedList<EvictionPolicy<Integer>>();
+		centralEvictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
+
+		invokable = new GroupedWindowInvokable<Integer, Integer>(reduceFunction, keySelector,
+				triggers, null, centralTriggers, centralEvictions);
+
+		result = MockContext.createAndExecute(invokable, inputs);
+		actual = new LinkedList<Integer>();
+		for (Integer current : result) {
+			actual.add(current);
+		}
+
+		assertEquals(new HashSet<Integer>(expectedCentralEviction), new HashSet<Integer>(actual));
+		assertEquals(expectedCentralEviction.size(), actual.size());
+	}
+
+	/**
+	 * Test for non active distributed triggers with separated key field
+	 */
+	@Test
+	public void testGroupedWindowInvokableDistributedTriggerComplex() {
+		List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
+		inputs2.add(new Tuple2<Integer, String>(1, "a"));
+		inputs2.add(new Tuple2<Integer, String>(0, "b"));
+		inputs2.add(new Tuple2<Integer, String>(2, "a"));
+		inputs2.add(new Tuple2<Integer, String>(-1, "a"));
+		inputs2.add(new Tuple2<Integer, String>(-2, "a"));
+		inputs2.add(new Tuple2<Integer, String>(10, "a"));
+		inputs2.add(new Tuple2<Integer, String>(2, "b"));
+		inputs2.add(new Tuple2<Integer, String>(1, "a"));
+
+		List<Tuple2<Integer, String>> expected2 = new ArrayList<Tuple2<Integer, String>>();
+		expected2.add(new Tuple2<Integer, String>(-1, "a"));
+		expected2.add(new Tuple2<Integer, String>(-2, "a"));
+		expected2.add(new Tuple2<Integer, String>(0, "b"));
+
+		LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
+		// Trigger on every 2nd element, but the first time after the 3rd
+		triggers.add(new CountTriggerPolicy<Tuple2<Integer, String>>(3));
+
+		LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
+		// On every 2nd element, remove the oldest 2 elements, but the first
+		// time after the 3rd element
+		evictions.add(new TumblingEvictionPolicy<Tuple2<Integer, String>>());
+
+		LinkedList<TriggerPolicy<Tuple2<Integer, String>>> centralTriggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
+
+		GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable2 = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
+				new ReduceFunction<Tuple2<Integer, String>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1,
+							Tuple2<Integer, String> value2) throws Exception {
+						if (value1.f0 <= value2.f0) {
+							return value1;
+						} else {
+							return value2;
+						}
+					}
+				}, keySelector, triggers, evictions, centralTriggers, null);
+
+		List<Tuple2<Integer, String>> result = MockContext.createAndExecute(invokable2, inputs2);
+
+		List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>();
+		for (Tuple2<Integer, String> current : result) {
+			actual2.add(current);
+		}
+
+		assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
+				new HashSet<Tuple2<Integer, String>>(actual2));
+		assertEquals(expected2.size(), actual2.size());
+	}
+
+	/**
+	 * Test for active centralized trigger
+	 */
+	@Test
+	public void testGroupedWindowInvokableCentralActiveTrigger() {
+
+		List<Tuple2<Integer, String>> inputs = new ArrayList<Tuple2<Integer, String>>();
+		inputs.add(new Tuple2<Integer, String>(1, "a"));
+		inputs.add(new Tuple2<Integer, String>(1, "b"));
+		inputs.add(new Tuple2<Integer, String>(1, "c"));
+		inputs.add(new Tuple2<Integer, String>(2, "a"));
+		inputs.add(new Tuple2<Integer, String>(2, "b"));
+		inputs.add(new Tuple2<Integer, String>(2, "c"));
+		inputs.add(new Tuple2<Integer, String>(2, "b"));
+		inputs.add(new Tuple2<Integer, String>(2, "a"));
+		inputs.add(new Tuple2<Integer, String>(2, "c"));
+		inputs.add(new Tuple2<Integer, String>(3, "c"));
+		inputs.add(new Tuple2<Integer, String>(3, "a"));
+		inputs.add(new Tuple2<Integer, String>(3, "b"));
+		inputs.add(new Tuple2<Integer, String>(4, "a"));
+		inputs.add(new Tuple2<Integer, String>(4, "b"));
+		inputs.add(new Tuple2<Integer, String>(4, "c"));
+		inputs.add(new Tuple2<Integer, String>(5, "c"));
+		inputs.add(new Tuple2<Integer, String>(5, "a"));
+		inputs.add(new Tuple2<Integer, String>(5, "b"));
+		inputs.add(new Tuple2<Integer, String>(10, "b"));
+		inputs.add(new Tuple2<Integer, String>(10, "a"));
+		inputs.add(new Tuple2<Integer, String>(10, "c"));
+		inputs.add(new Tuple2<Integer, String>(11, "a"));
+		inputs.add(new Tuple2<Integer, String>(11, "a"));
+		inputs.add(new Tuple2<Integer, String>(11, "c"));
+		inputs.add(new Tuple2<Integer, String>(11, "c"));
+		inputs.add(new Tuple2<Integer, String>(11, "b"));
+		inputs.add(new Tuple2<Integer, String>(11, "b"));
+
+		// Expected result:
+		// For each group (a,b and c):
+		// 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
+		// 12-12-5-10-32
+
+		List<Tuple2<Integer, String>> expected = new ArrayList<Tuple2<Integer, String>>();
+		expected.add(new Tuple2<Integer, String>(12, "a"));
+		expected.add(new Tuple2<Integer, String>(12, "b"));
+		expected.add(new Tuple2<Integer, String>(12, "c"));
+		expected.add(new Tuple2<Integer, String>(12, "a"));
+		expected.add(new Tuple2<Integer, String>(12, "b"));
+		expected.add(new Tuple2<Integer, String>(12, "c"));
+		expected.add(new Tuple2<Integer, String>(5, "a"));
+		expected.add(new Tuple2<Integer, String>(5, "b"));
+		expected.add(new Tuple2<Integer, String>(5, "c"));
+		expected.add(new Tuple2<Integer, String>(10, "a"));
+		expected.add(new Tuple2<Integer, String>(10, "b"));
+		expected.add(new Tuple2<Integer, String>(10, "c"));
+		expected.add(new Tuple2<Integer, String>(32, "a"));
+		expected.add(new Tuple2<Integer, String>(32, "b"));
+		expected.add(new Tuple2<Integer, String>(32, "c"));
+
+		Timestamp<Tuple2<Integer, String>> myTimeStamp = new Timestamp<Tuple2<Integer, String>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public long getTimestamp(Tuple2<Integer, String> value) {
+				return value.f0;
+			}
+		};
+
+		TimestampWrapper<Tuple2<Integer, String>> myTimeStampWrapper = new TimestampWrapper<Tuple2<Integer, String>>(
+				myTimeStamp, 1);
+
+		ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new ReduceFunction<Tuple2<Integer, String>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1,
+					Tuple2<Integer, String> value2) throws Exception {
+				return new Tuple2<Integer, String>(value1.f0 + value2.f0, value1.f1);
+			}
+		};
+
+		LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
+		// Trigger every 2 time units but delay the first trigger by 2 (First
+		// trigger after 4, then every 2)
+		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L));
+
+		LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
+		// Always delete all elements older then 4
+		evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper));
+
+		LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
+
+		GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
+				myReduceFunction, keySelector, distributedTriggers, evictions, triggers, null);
+
+		ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
+		for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) {
+			result.add(t);
+		}
+
+		assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(result));
+		assertEquals(expected.size(), result.size());
+
+		// repeat the test with central eviction. The result should be the same.
+		triggers.clear();
+		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L));
+		evictions.clear();
+		LinkedList<EvictionPolicy<Tuple2<Integer, String>>> centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>();
+		centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper));
+
+		invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
+				myReduceFunction, keySelector, distributedTriggers, evictions, triggers,
+				centralEvictions);
+
+		result = new ArrayList<Tuple2<Integer, String>>();
+		for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) {
+			result.add(t);
+		}
+
+		assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(result));
+		assertEquals(expected.size(), result.size());
+	}
+
+	/**
+	 * Test for multiple centralized trigger
+	 */
+	@Test
+	public void testGroupedWindowInvokableMultipleCentralTrigger() {
+		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
+		triggers.add(new CountTriggerPolicy<Integer>(8));
+		triggers.add(new CountTriggerPolicy<Integer>(5));
+
+		LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>();
+		// The active wrapper causes eviction even on (fake) elements which
+		// triggered, but does not belong to the group.
+		evictions.add(new ActiveCloneableEvictionPolicyWrapper<Integer>(
+				new TumblingEvictionPolicy<Integer>()));
+
+		LinkedList<CloneableTriggerPolicy<Integer>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Integer>>();
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		inputs.add(1);
+		inputs.add(2);
+		inputs.add(2);
+		inputs.add(2);
+		inputs.add(1);
+		// 1st Trigger: 2;6
+		inputs.add(2);
+		inputs.add(1);
+		inputs.add(2);
+		// 2nd Trigger: 1;4
+		inputs.add(2);
+		inputs.add(1);
+		// Final: 1,2
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(2);
+		expected.add(6);
+		expected.add(4);
+		expected.add(1);
+		expected.add(2);
+		expected.add(1);
+
+		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
+				myReduceFunction, new KeySelector<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer getKey(Integer value) {
+						return value;
+					}
+				}, distributedTriggers, evictions, triggers, null);
+
+		ArrayList<Integer> result = new ArrayList<Integer>();
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
+			result.add(t);
+		}
+
+		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
+		assertEquals(expected.size(), result.size());
+	}
+
+	/**
+	 * Test for combination of centralized trigger and distributed trigger at
+	 * the same time
+	 */
+	@Test
+	public void testGroupedWindowInvokableCentralAndDistrTrigger() {
+		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
+		triggers.add(new CountTriggerPolicy<Integer>(8));
+		triggers.add(new CountTriggerPolicy<Integer>(5));
+
+		LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>();
+		// The active wrapper causes eviction even on (fake) elements which
+		// triggered, but does not belong to the group.
+		evictions.add(new ActiveCloneableEvictionPolicyWrapper<Integer>(
+				new TumblingEvictionPolicy<Integer>()));
+
+		LinkedList<CloneableTriggerPolicy<Integer>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Integer>>();
+		distributedTriggers.add(new CountTriggerPolicy<Integer>(2));
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		inputs.add(1);
+		inputs.add(2);
+		inputs.add(2);
+		// local on 2 => 4
+		inputs.add(2);
+		inputs.add(1);
+		// local on 1 => 2
+		// and 1st Central: 2;2
+		// SUMS up to 2;2
+		inputs.add(2);
+		// local on 2 => 2
+		inputs.add(1);
+		inputs.add(2);
+		// 2nd Central: 1;2
+		inputs.add(2);
+		inputs.add(1);
+		// Final: 1,2
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(4);
+		expected.add(2);
+		expected.add(2);
+		expected.add(2);
+		expected.add(1);
+		expected.add(2);
+		expected.add(1);
+		expected.add(2);
+
+		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
+				myReduceFunction, new KeySelector<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer getKey(Integer value) {
+						return value;
+					}
+				}, distributedTriggers, evictions, triggers, null);
+
+		ArrayList<Integer> result = new ArrayList<Integer>();
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
+			result.add(t);
+		}
+
+		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
+		assertEquals(expected.size(), result.size());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
new file mode 100644
index 0000000..5390ec9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class MapTest {
+
+	private static class Map implements MapFunction<Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map(Integer value) throws Exception {
+			return "+" + (value + 1);
+		}
+	}
+	
+	@Test
+	public void mapInvokableTest() {
+		MapInvokable<Integer, String> invokable = new MapInvokable<Integer, String>(new Map());
+		
+		List<String> expectedList = Arrays.asList("+2", "+3", "+4");
+		List<String> actualList = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3));
+		
+		assertEquals(expectedList, actualList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
new file mode 100644
index 0000000..11c44cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.StreamProjection;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class ProjectTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	@Test
+	public void test() {
+
+		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
+				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
+						4));
+
+		int[] fields = new int[] { 4, 4, 3 };
+		Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
+
+		@SuppressWarnings("unchecked")
+		ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
+				fields,
+				(TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection
+						.extractFieldTypes(fields, classes, inType));
+
+		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2));
+		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7));
+
+		List<Tuple3<Integer, Integer, String>> expected = new ArrayList<Tuple3<Integer, Integer, String>>();
+		expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b"));
+		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+		expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
+
+		assertEquals(expected, MockContext.createAndExecute(invokable, input));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
new file mode 100755
index 0000000..ae866e6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class StreamReduceTest {
+
+	private static class MyReducer implements ReduceFunction<Integer>{
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1+value2;
+		}
+		
+	}
+	
+	@Test
+	public void test() {
+		StreamReduceInvokable<Integer> invokable1 = new StreamReduceInvokable<Integer>(
+				new MyReducer());
+
+		List<Integer> expected = Arrays.asList(1,2,4,7,10);
+		List<Integer> actual = MockContext.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 3, 3));
+
+		assertEquals(expected, actual);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
new file mode 100644
index 0000000..83b4596
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class WindowInvokableTest {
+
+	/**
+	 * Test case equal to {@link WindowReduceInvokableTest}
+	 */
+	@Test
+	public void testWindowInvokableWithTimePolicy() {
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		inputs.add(1);
+		inputs.add(2);
+		inputs.add(2);
+		inputs.add(3);
+		inputs.add(4);
+		inputs.add(5);
+		inputs.add(10);
+		inputs.add(11);
+		inputs.add(11);
+		// 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
+		// 12-12-5-10-32
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(12);
+		expected.add(12);
+		expected.add(5);
+		expected.add(10);
+		expected.add(32);
+
+		Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public long getTimestamp(Integer value) {
+				return value;
+			}
+		};
+
+		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
+		// Trigger every 2 time units but delay the first trigger by 2 (First
+		// trigger after 4, then every 2)
+		triggers.add(new TimeTriggerPolicy<Integer>(2L, new TimestampWrapper<Integer>(myTimeStamp,
+				1), 2L));
+
+		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
+		// Always delete all elements older then 4
+		evictions.add(new TimeEvictionPolicy<Integer>(4L, new TimestampWrapper<Integer>(
+				myTimeStamp, 1)));
+
+		WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>(
+				myReduceFunction, triggers, evictions);
+
+		ArrayList<Integer> result = new ArrayList<Integer>();
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
+			result.add(t);
+		}
+
+		assertEquals(expected, result);
+	}
+
+	/**
+	 * Test case equal to {@link BatchReduceTest}
+	 */
+	@Test
+	public void testWindowInvokableWithCountPolicy() {
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		for (Integer i = 1; i <= 10; i++) {
+			inputs.add(i);
+		}
+
+		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		/*
+		 * The following setup reassembles the batch size 3 and the slide size 2
+		 * of the BatchReduceInvokable.
+		 */
+		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
+		// Trigger on every 2nd element, but the first time after the 3rd
+		triggers.add(new CountTriggerPolicy<Integer>(2, -1));
+
+		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
+		// On every 2nd element, remove the oldest 2 elements, but the first
+		// time after the 3rd element
+		evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
+
+		WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>(
+				myReduceFunction, triggers, evictions);
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(6);
+		expected.add(12);
+		expected.add(18);
+		expected.add(24);
+		expected.add(19);
+		List<Integer> result = new ArrayList<Integer>();
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
+			result.add(t);
+		}
+		assertEquals(expected, result);
+
+		/*
+		 * Begin test part 2
+		 */
+
+		List<Integer> inputs2 = new ArrayList<Integer>();
+		inputs2.add(1);
+		inputs2.add(2);
+		inputs2.add(-5); // changed this value to make sure it is excluded from
+							// the result
+		inputs2.add(-3);
+		inputs2.add(-4);
+
+		myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				if (value1 <= value2) {
+					return value1;
+				} else {
+					return value2;
+				}
+			};
+		};
+
+		/*
+		 * The following setup reassembles the batch size 2 and the slide size 3
+		 * of the BatchReduceInvokable.
+		 */
+		triggers = new LinkedList<TriggerPolicy<Integer>>();
+		// Trigger after every 3rd element, but the first time after the 2nd
+		triggers.add(new CountTriggerPolicy<Integer>(3, 1));
+
+		evictions = new LinkedList<EvictionPolicy<Integer>>();
+		// On every 3rd element, remove the oldest 3 elements, but the first
+		// time after on the 5th element
+		evictions.add(new CountEvictionPolicy<Integer>(3, 3, -1));
+
+		WindowInvokable<Integer, Integer> invokable2 = new WindowReduceInvokable<Integer>(
+				myReduceFunction, triggers, evictions);
+
+		List<Integer> expected2 = new ArrayList<Integer>();
+		expected2.add(1);
+		expected2.add(-4);
+
+		result = new ArrayList<Integer>();
+		for (Integer t : MockContext.createAndExecute(invokable2, inputs2)) {
+			result.add(t);
+		}
+
+		assertEquals(expected2, result);
+
+	}
+
+	@Test
+	public void testWindowInvokableWithMultiplePolicies() {
+		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
+		triggers.add(new CountTriggerPolicy<Integer>(2));
+		triggers.add(new CountTriggerPolicy<Integer>(3));
+
+		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
+		evictions.add(new CountEvictionPolicy<Integer>(2, 2));
+		evictions.add(new CountEvictionPolicy<Integer>(3, 3));
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		for (Integer i = 1; i <= 10; i++) {
+			inputs.add(i);
+		}
+		/**
+		 * <code>
+		 * VAL: 1,2,3,4,5,6,7,8,9,10
+		 * TR1:   |   |   |   |   |
+		 * TR2:     |     |     |
+		 * EV1:   2   2   2   2   2
+		 * EV2:     3     3     3
+		 * </code>
+		 */
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(3);
+		expected.add(3);
+		expected.add(4);
+		expected.add(11);
+		expected.add(15);
+		expected.add(9);
+		expected.add(10);
+
+		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>(
+				myReduceFunction, triggers, evictions);
+
+		ArrayList<Integer> result = new ArrayList<Integer>();
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
+			result.add(t);
+		}
+
+		assertEquals(expected, result);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java
new file mode 100644
index 0000000..b044d88
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamrecord;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.junit.Test;
+
+public class UIDTest {
+
+	//TODO fix with matching DataOutputStream and DataOutputView
+	@Test
+	public void test() throws IOException {
+		DataOutputSerializer out = new DataOutputSerializer(64);
+		
+		UID id = new UID(3);
+		id.write(out);
+
+		ByteBuffer buff = out.wrapAsByteBuffer();
+
+		
+		DataInputDeserializer in = new DataInputDeserializer(buff);
+		
+		UID id2 = new UID();
+		id2.read(in);
+
+		assertEquals(id.getChannelId(), id2.getChannelId());
+		assertArrayEquals(id.getGeneratedId(), id2.getGeneratedId());
+		assertArrayEquals(id.getId(), id2.getId());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
new file mode 100755
index 0000000..a40048e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.ArrayList;
+
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+
+public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
+
+	public ArrayList<Integer> emittedRecords;
+
+	public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
+		super(inputBase.getEnvironment().getWriter(0));
+	}
+
+	public boolean initList() {
+		emittedRecords = new ArrayList<Integer>();
+		return true;
+	}
+	
+	@Override
+	public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
+		emittedRecords.add(record.getInstance().getObject().f0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
new file mode 100644
index 0000000..b103d84
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class StreamVertexTest {
+
+	private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
+
+	public static class MySource implements SourceFunction<Tuple1<Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
+
+		@Override
+		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+			for (int i = 0; i < 10; i++) {
+				tuple.f0 = i;
+				collector.collect(tuple);
+			}
+		}
+	}
+
+	public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
+			Integer i = value.f0;
+			return new Tuple2<Integer, Integer>(i, i + 1);
+		}
+	}
+
+	public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple2<Integer, Integer> tuple) {
+			Integer k = tuple.getField(0);
+			Integer v = tuple.getField(1);
+			data.put(k, v);
+		}
+	}
+
+	@SuppressWarnings("unused")
+	private static final int PARALLELISM = 1;
+	private static final int SOURCE_PARALELISM = 1;
+	private static final long MEMORYSIZE = 32;
+
+	@Test
+	public void wrongJobGraph() {
+		LocalStreamEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(SOURCE_PARALELISM);
+
+
+		try {
+			env.fromCollection(null);
+			fail();
+		} catch (NullPointerException e) {
+		}
+
+		try {
+			env.fromElements();
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.generateSequence(-10, -30);
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.setBufferTimeout(-10);
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.generateSequence(1, 10).project(2);
+			fail();
+		} catch (RuntimeException e) {
+		}
+	}
+
+	private static class CoMap implements CoMapFunction<String, Long, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(String value) {
+			return value;
+		}
+
+		@Override
+		public String map2(Long value) {
+			return value.toString();
+		}
+	}
+
+	static HashSet<String> resultSet;
+
+	private static class SetSink implements SinkFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(String value) {
+			resultSet.add(value);
+		}
+	}
+
+	@Test
+	public void coTest() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
+
+		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
+		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
+
+		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
+
+		resultSet = new HashSet<String>();
+		env.execute();
+
+		HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
+				"2", "3"));
+		assertEquals(expectedSet, resultSet);
+	}
+
+	@Test
+	public void runStream() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
+
+		env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
+
+		env.execute();
+		assertEquals(10, data.keySet().size());
+
+		for (Integer k : data.keySet()) {
+			assertEquals((Integer) (k + 1), data.get(k));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
new file mode 100644
index 0000000..e12b254
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class CosineDistanceTest {
+	
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testCosineDistance() {
+		
+		//Reference calculated using wolfram alpha
+		double[][][] testdata={
+				{{0,0,0},{0,0,0}},
+				{{0,0,0},{1,2,3}},
+				{{1,2,3},{0,0,0}},
+				{{1,2,3},{4,5,6}},
+				{{1,2,3},{-4,-5,-6}},
+				{{1,2,-3},{-4,5,-6}},
+				{{1,2,3,4},{5,6,7,8}},
+				{{1,2},{3,4}},
+				{{1},{2}},
+			};
+		double[] referenceSolutions={
+				0,
+				0,
+				0,
+				0.025368,
+				1.974631,
+				0.269026,
+				0.031136,
+				0.016130,
+				0
+		};
+		
+		for (int i = 0; i < testdata.length; i++) {
+			assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and "
+					+ arrayToString(testdata[i][0]), referenceSolutions[i],
+					new CosineDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
+		}
+	}
+	
+	private String arrayToString(double[] in){
+		if (in.length==0) return "{}";
+		String result="{";
+		for (double d:in){
+			result+=d+",";
+		}
+		return result.substring(0, result.length()-1)+"}";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
new file mode 100644
index 0000000..8c62497
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class EuclideanDistanceTest {
+	
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testEuclideanDistance() {
+		
+		//Reference calculated using wolfram alpha
+		double[][][] testdata={
+				{{0,0,0},{0,0,0}},
+				{{0,0,0},{1,2,3}},
+				{{1,2,3},{0,0,0}},
+				{{1,2,3},{4,5,6}},
+				{{1,2,3},{-4,-5,-6}},
+				{{1,2,-3},{-4,5,-6}},
+				{{1,2,3,4},{5,6,7,8}},
+				{{1,2},{3,4}},
+				{{1},{2}},
+			};
+		double[] referenceSolutions={
+				0,
+				3.741657,
+				3.741657,
+				5.196152,
+				12.4499,
+				6.557439,
+				8.0,
+				2.828427,
+				1
+		};
+		
+		for (int i = 0; i < testdata.length; i++) {
+			assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and "
+					+ arrayToString(testdata[i][0]), referenceSolutions[i],
+					new EuclideanDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
+		}
+		
+	}
+	
+	private String arrayToString(double[] in){
+		if (in.length==0) return "{}";
+		String result="{";
+		for (double d:in){
+			result+=d+",";
+		}
+		return result.substring(0, result.length()-1)+"}";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
new file mode 100644
index 0000000..17d3974
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.windowing.extractor.ArrayFromTuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ArrayFromTupleTest {
+
+	private String[] testStrings;
+
+	@Before
+	public void init() {
+		testStrings = new String[Tuple.MAX_ARITY];
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			testStrings[i] = Integer.toString(i);
+		}
+	}
+
+	@Test
+	public void testConvertFromTupleToArray() throws InstantiationException, IllegalAccessException {
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			Tuple currentTuple = (Tuple) CLASSES[i].newInstance();
+			String[] currentArray = new String[i + 1];
+			for (int j = 0; j <= i; j++) {
+				currentTuple.setField(testStrings[j], j);
+				currentArray[j] = testStrings[j];
+			}
+			arrayEqualityCheck(currentArray, new ArrayFromTuple().extract(currentTuple));
+		}
+	}
+
+	@Test
+	public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
+		Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			currentTuple.setField(testStrings[i], i);
+		}
+
+		String[] expected = { testStrings[5], testStrings[3], testStrings[6], testStrings[7],
+				testStrings[0] };
+		arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
+
+		String[] expected2 = { testStrings[0], testStrings[Tuple.MAX_ARITY - 1] };
+		arrayEqualityCheck(expected2,
+				new ArrayFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
+
+		String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], testStrings[0] };
+		arrayEqualityCheck(expected3,
+				new ArrayFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
+
+		String[] expected4 = { testStrings[13], testStrings[4], testStrings[5], testStrings[4],
+				testStrings[2], testStrings[8], testStrings[6], testStrings[2], testStrings[8],
+				testStrings[3], testStrings[5], testStrings[2], testStrings[16], testStrings[4],
+				testStrings[3], testStrings[2], testStrings[6], testStrings[4], testStrings[7],
+				testStrings[4], testStrings[2], testStrings[8], testStrings[7], testStrings[2] };
+		arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
+				4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
+	}
+
+	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+		assertEquals("The result arrays must have the same length", array1.length, array2.length);
+		for (int i = 0; i < array1.length; i++) {
+			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
+		}
+	}
+
+	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
+			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+			Tuple24.class, Tuple25.class };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java
new file mode 100644
index 0000000..82a876a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.extractor.ArrayFromTuple;
+import org.apache.flink.streaming.api.windowing.extractor.ConcatinatedExtract;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromArray;
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
+import org.apache.flink.streaming.api.windowing.extractor.FieldsFromArray;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConcatinatedExtractTest {
+
+	private String[] testStringArray1 = { "1", "2", "3" };
+	private int[] testIntArray1 = { 1, 2, 3 };
+	private String[] testStringArray2 = { "4", "5", "6" };
+	private int[] testIntArray2 = { 4, 5, 6 };
+	private String[] testStringArray3 = { "7", "8", "9" };
+	private int[] testIntArray3 = { 7, 8, 9 };
+	private Tuple2<String[], int[]>[] testTuple2Array;
+	private Tuple2<String[], int[]> testTuple2;
+	private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> testData;
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void setupData() {
+		testTuple2Array = new Tuple2[2];
+		testTuple2Array[0] = new Tuple2<String[], int[]>(testStringArray1, testIntArray2);
+		testTuple2Array[1] = new Tuple2<String[], int[]>(testStringArray2, testIntArray1);
+
+		testTuple2 = new Tuple2<String[], int[]>(testStringArray3, testIntArray3);
+
+		testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]>(testTuple2,
+				testTuple2Array);
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void test1() {
+		Extractor ext = new ConcatinatedExtract(new FieldFromTuple(0), new FieldFromTuple(1))
+				.add(new FieldsFromArray(Integer.class, 2, 1, 0));
+		int[] expected = { testIntArray3[2], testIntArray3[1], testIntArray3[0] };
+		assertEquals(new Integer(expected[0]), ((Integer[]) ext.extract(testData))[0]);
+		assertEquals(new Integer(expected[1]), ((Integer[]) ext.extract(testData))[1]);
+		assertEquals(new Integer(expected[2]), ((Integer[]) ext.extract(testData))[2]);
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void test2() {
+		Extractor ext = new ConcatinatedExtract(new FieldFromTuple(1), // Tuple2<String[],int[]>[]
+				new FieldsFromArray(Tuple2.class, 1)) // Tuple2<String[],int[]>[]
+				.add(new FieldFromArray(0)) // Tuple2<String[],int[]>
+				.add(new ArrayFromTuple(0)) // Object[] (Containing String[])
+				.add(new FieldFromArray(0)) // String[]
+				.add(new FieldFromArray(1)); // String
+
+		String expected2 = testStringArray2[1];
+		assertEquals(expected2, ext.extract(testData));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
new file mode 100644
index 0000000..2d4dbcf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromArray;
+import org.junit.Test;
+
+public class FieldFromArrayTest {
+
+	String[] testStringArray = { "0", "1", "2", "3", "4" };
+	Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
+	int[] testIntArray = { 20, 21, 22, 23, 24 };
+
+	@Test
+	public void testStringArray() {
+		for (int i = 0; i < this.testStringArray.length; i++) {
+			assertEquals(this.testStringArray[i],
+					new FieldFromArray<String>(i).extract(testStringArray));
+		}
+	}
+
+	@Test
+	public void testIntegerArray() {
+		for (int i = 0; i < this.testIntegerArray.length; i++) {
+			assertEquals(this.testIntegerArray[i],
+					new FieldFromArray<String>(i).extract(testIntegerArray));
+		}
+	}
+
+	@Test
+	public void testIntArray() {
+		for (int i = 0; i < this.testIntArray.length; i++) {
+			assertEquals(new Integer(this.testIntArray[i]),
+					new FieldFromArray<Integer>(i).extract(testIntArray));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
new file mode 100644
index 0000000..528611a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FieldFromTupleTest {
+
+	private String[] testStrings;
+
+	@Before
+	public void init() {
+		testStrings = new String[Tuple.MAX_ARITY];
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			testStrings[i] = Integer.toString(i);
+		}
+	}
+
+	@Test
+	public void testSingleFieldExtraction() throws InstantiationException, IllegalAccessException {
+		// extract single fields
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			Tuple current = (Tuple) CLASSES[i].newInstance();
+			for (int j = 0; j < i; j++) {
+				current.setField(testStrings[j], j);
+			}
+			for (int j = 0; j < i; j++) {
+				assertEquals(testStrings[j], new FieldFromTuple<String>(j).extract(current));
+			}
+		}
+	}
+
+	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
+			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+			Tuple24.class, Tuple25.class };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
new file mode 100644
index 0000000..3139aa5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.streaming.api.windowing.extractor.FieldsFromArray;
+import org.junit.Test;
+
+public class FieldsFromArrayTest {
+
+	String[] testStringArray = { "0", "1", "2", "3", "4" };
+	Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
+	int[] testIntArray = { 20, 21, 22, 23, 24 };
+
+	@Test
+	public void testStringArray() {
+		// check single field extraction
+		for (int i = 0; i < testStringArray.length; i++) {
+			String[] tmp = { testStringArray[i] };
+			arrayEqualityCheck(tmp,
+					new FieldsFromArray<String>(String.class, i).extract(testStringArray));
+		}
+
+		// check reverse order
+		String[] reverseOrder = new String[testStringArray.length];
+		for (int i = 0; i < testStringArray.length; i++) {
+			reverseOrder[i] = testStringArray[testStringArray.length - i - 1];
+		}
+		arrayEqualityCheck(reverseOrder,
+				new FieldsFromArray<String>(String.class, 4, 3, 2, 1, 0).extract(testStringArray));
+
+		// check picking fields and reorder
+		String[] crazyOrder = { testStringArray[4], testStringArray[1], testStringArray[2] };
+		arrayEqualityCheck(crazyOrder,
+				new FieldsFromArray<String>(String.class, 4, 1, 2).extract(testStringArray));
+	}
+
+	@Test
+	public void testIntegerArray() {
+		// check single field extraction
+		for (int i = 0; i < testIntegerArray.length; i++) {
+			Integer[] tmp = { testIntegerArray[i] };
+			arrayEqualityCheck(tmp,
+					new FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray));
+		}
+
+		// check reverse order
+		Integer[] reverseOrder = new Integer[testIntegerArray.length];
+		for (int i = 0; i < testIntegerArray.length; i++) {
+			reverseOrder[i] = testIntegerArray[testIntegerArray.length - i - 1];
+		}
+		arrayEqualityCheck(reverseOrder,
+				new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0)
+						.extract(testIntegerArray));
+
+		// check picking fields and reorder
+		Integer[] crazyOrder = { testIntegerArray[4], testIntegerArray[1], testIntegerArray[2] };
+		arrayEqualityCheck(crazyOrder,
+				new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntegerArray));
+
+	}
+
+	@Test
+	public void testIntArray() {
+		for (int i = 0; i < testIntArray.length; i++) {
+			Integer[] tmp = { testIntArray[i] };
+			arrayEqualityCheck(tmp,
+					new FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray));
+		}
+
+		// check reverse order
+		Integer[] reverseOrder = new Integer[testIntArray.length];
+		for (int i = 0; i < testIntArray.length; i++) {
+			reverseOrder[i] = testIntArray[testIntArray.length - i - 1];
+		}
+		arrayEqualityCheck(reverseOrder,
+				new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0).extract(testIntArray));
+
+		// check picking fields and reorder
+		Integer[] crazyOrder = { testIntArray[4], testIntArray[1], testIntArray[2] };
+		arrayEqualityCheck(crazyOrder,
+				new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntArray));
+
+	}
+
+	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+		assertEquals("The result arrays must have the same length", array1.length, array2.length);
+		for (int i = 0; i < array1.length; i++) {
+			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
+		}
+	}
+}


Mime
View raw message