flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [33/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:11 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
deleted file mode 100644
index 15d1fd5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state.checkpoint;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.streaming.state.MapState;
-import org.apache.flink.streaming.state.OperatorState;
-
-public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected Set<K> removedItems;
-	protected Map<K, V> updatedItems;
-	protected boolean clear;
-
-	@SuppressWarnings("unchecked")
-	public MapCheckpoint(OperatorState<Map<K, V>> operatorState) {
-		if (operatorState instanceof MapState) {
-			MapState<K, V> mapState = (MapState<K, V>) operatorState;
-
-			this.removedItems = mapState.getRemovedItems();
-			this.clear = mapState.isCleared();
-
-			this.updatedItems = new HashMap<K, V>();
-			for (K key : mapState.getUpdatedItems()) {
-				this.updatedItems.put(key, mapState.get(key));
-			}
-			this.checkpointedState = this.updatedItems;
-
-		} else {
-			throw new RuntimeException("MapCheckpoint can only be used with MapState");
-		}
-
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public StateCheckpoint<Map<K, V>> update(StateCheckpoint<Map<K, V>> nextCheckpoint) {
-		MapCheckpoint<K, V> mapCheckpoint = (MapCheckpoint<K, V>) nextCheckpoint;
-		if (this.checkpointedState == null) {
-			this.checkpointedState = mapCheckpoint.updatedItems;
-		} else {
-			if (mapCheckpoint.clear) {
-				this.checkpointedState.clear();
-			}
-			for (Object key : mapCheckpoint.removedItems) {
-				this.checkpointedState.remove(key);
-			}
-			this.checkpointedState.putAll(mapCheckpoint.updatedItems);
-		}
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
deleted file mode 100644
index 8b76245..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state.checkpoint;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.state.OperatorState;
-
-/**
- * Base class for creating checkpoints for {@link OperatorState}. This
- * checkpoints will be used to backup states in stateful Flink operators and
- * also to restore them in case of node failure. To allow incremental
- * checkpoints override the {@link #update(StateCheckpoint)} method.
- * 
- * @param <T>
- *            The type of the state.
- */
-public class StateCheckpoint<T> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	T checkpointedState;
-
-	/**
-	 * Creates a state checkpoint from the given {@link OperatorState}
-	 * 
-	 * @param operatorState
-	 *            The {@link OperatorState} to checkpoint.
-	 */
-	public StateCheckpoint(OperatorState<T> operatorState) {
-		this.checkpointedState = operatorState.getState();
-	}
-
-	public StateCheckpoint() {
-		this.checkpointedState = null;
-	}
-
-	/**
-	 * Returns the state object for the checkpoint.
-	 * 
-	 * @return The checkpointed state object.
-	 */
-	public T getCheckpointedState() {
-		return checkpointedState;
-	}
-
-	/**
-	 * Updates the checkpoint from next one. Override this method to allow
-	 * incremental updates.
-	 * 
-	 * @param nextCheckpoint
-	 *            The {@link StateCheckpoint} will be used to update from.
-	 */
-	public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) {
-		this.checkpointedState = nextCheckpoint.getCheckpointedState();
-		return this;
-	}
-
-	@Override
-	public String toString() {
-		return checkpointedState.toString();
-	}
-
-	public boolean stateEquals(StateCheckpoint<T> other) {
-		return checkpointedState.equals(other.checkpointedState);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
deleted file mode 100755
index 691b111..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import akka.actor.ActorRef;
-
-public class ClusterUtil {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
-	public static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
-
-	/**
-	 * Executes the given JobGraph locally, on a FlinkMiniCluster
-	 * 
-	 * @param jobGraph
-	 *            jobGraph
-	 * @param degreeOfParallelism
-	 *            numberOfTaskTrackers
-	 * @param memorySize
-	 *            memorySize
-	 */
-	public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism, long memorySize)
-			throws Exception {
-
-		Configuration configuration = jobGraph.getJobConfiguration();
-
-		LocalFlinkMiniCluster exec = null;
-
-		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, degreeOfParallelism);
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running on mini cluster");
-		}
-
-		try {
-			exec = new LocalFlinkMiniCluster(configuration, true);
-			ActorRef jobClient = exec.getJobClient();
-
-			JobClient.submitJobAndWait(jobGraph, true, jobClient, exec.timeout());
-
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			if (exec != null) {
-				exec.stop();
-			}
-		}
-	}
-
-	public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
-		runOnMiniCluster(jobGraph, numOfSlots, -1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
deleted file mode 100644
index 73e6360..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import java.lang.reflect.Array;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-
-public class KeySelectorUtil {
-
-	public static Class<?>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class, Tuple3.class,
-			Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class,
-			Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class,
-			Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class,
-			Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class,
-			Tuple25.class };
-
-	public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo) {
-		int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
-		int keyLength = logicalKeyPositions.length;
-		boolean[] orders = new boolean[keyLength];
-		TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
-				logicalKeyPositions, orders, 0);
-		return new ComparableKeySelector<X>(comparator, keyLength);
-	}
-
-	public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
-
-		private static final long serialVersionUID = 1L;
-
-		private TypeComparator<IN> comparator;
-		private int keyLength;
-		private Object[] keyArray;
-		private Tuple key;
-
-		public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength) {
-			this.comparator = comparator;
-			this.keyLength = keyLength;
-			keyArray = new Object[keyLength];
-			try {
-				key = (Tuple) tupleClasses[keyLength - 1].newInstance();
-			} catch (Exception e) {
-			}
-		}
-
-		@Override
-		public Tuple getKey(IN value) throws Exception {
-			comparator.extractKeys(value, keyArray, 0);
-			for (int i = 0; i < keyLength; i++) {
-				key.setField(keyArray[i], i);
-			}
-			return key;
-		}
-
-	}
-
-	public static class ArrayKeySelector<IN> implements KeySelector<IN, Tuple> {
-
-		private static final long serialVersionUID = 1L;
-
-		Tuple key;
-		int[] fields;
-
-		public ArrayKeySelector(int... fields) {
-			this.fields = fields;
-			try {
-				key = (Tuple) tupleClasses[fields.length - 1].newInstance();
-			} catch (Exception e) {
-			}
-		}
-
-		@Override
-		public Tuple getKey(IN value) throws Exception {
-			for (int i = 0; i < fields.length; i++) {
-				int pos = fields[i];
-				key.setField(Array.get(value, fields[pos]), i);
-			}
-			return key;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
deleted file mode 100644
index 49cd497..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.junit.Test;
-
-public class AggregationFunctionTest {
-
-	@Test
-	public void groupSumIntegerTest() {
-
-		List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Integer> expectedSumList0 = new ArrayList<Integer>();
-		List<Integer> expectedMinList0 = new ArrayList<Integer>();
-		List<Integer> expectedMaxList0 = new ArrayList<Integer>();
-		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer, Integer>>();
-		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>();
-
-		List<Integer> simpleInput = new ArrayList<Integer>();
-
-		int groupedSum0 = 0;
-		int groupedSum1 = 0;
-		int groupedSum2 = 0;
-
-		for (int i = 0; i < 9; i++) {
-			simpleInput.add(i);
-			expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2));
-			expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0));
-			expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
-
-			expectedSumList0.add((i + 1) * i / 2);
-			expectedMaxList0.add(i);
-			expectedMinList0.add(0);
-
-			int groupedSum;
-			switch (i % 3) {
-			case 0:
-				groupedSum = groupedSum0 += i;
-				break;
-			case 1:
-				groupedSum = groupedSum1 += i;
-				break;
-			default:
-				groupedSum = groupedSum2 += i;
-				break;
-			}
-
-			expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum));
-			expectedGroupMinList.add(new Tuple2<Integer, Integer>(i % 3, i % 3));
-			expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i));
-		}
-
-		TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(0, 0));
-		TypeInformation<Integer> type2 = TypeExtractor.getForObject(2);
-
-		ReduceFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregator.getSumFunction(1,
-				Integer.class, type1);
-		ReduceFunction<Integer> sumFunction0 = SumAggregator
-				.getSumFunction(0, Integer.class, type2);
-		ReduceFunction<Tuple2<Integer, Integer>> minFunction = ComparableAggregator.getAggregator(
-				1, type1, AggregationType.MIN);
-		ReduceFunction<Integer> minFunction0 = ComparableAggregator.getAggregator(0, type2,
-				AggregationType.MIN);
-		ReduceFunction<Tuple2<Integer, Integer>> maxFunction = ComparableAggregator.getAggregator(
-				1, type1, AggregationType.MAX);
-		ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2,
-				AggregationType.MAX);
-		List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
-
-		List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList());
-
-		List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
-
-		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(1, 1));
-
-		KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, typeInfo),
-				typeInfo);
-
-		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, keySelector),
-				getInputList());
-
-		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, keySelector),
-				getInputList());
-
-		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
-				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, keySelector),
-				getInputList());
-
-		assertEquals(expectedSumList, sumList);
-		assertEquals(expectedMinList, minList);
-		assertEquals(expectedMaxList, maxList);
-		assertEquals(expectedGroupSumList, groupedSumList);
-		assertEquals(expectedGroupMinList, groupedMinList);
-		assertEquals(expectedGroupMaxList, groupedMaxList);
-		assertEquals(expectedSumList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
-		assertEquals(expectedMinList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
-		assertEquals(expectedMaxList0, MockContext.createAndExecute(
-				new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-		try {
-			env.generateSequence(1, 100).min(1);
-			fail();
-		} catch (Exception e) {
-			// Nothing to do here
-		}
-		try {
-			env.generateSequence(1, 100).min(2);
-			fail();
-		} catch (Exception e) {
-			// Nothing to do here
-		}
-		try {
-			env.generateSequence(1, 100).min(3);
-			fail();
-		} catch (Exception e) {
-			// Nothing to do here
-		}
-
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MAXBY, true);
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MAXBY, false);
-
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MINBY, true);
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MINBY, false);
-
-		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-
-		List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
-
-		List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-
-		List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-
-		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
-				getInputList()));
-		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
-				getInputList()));
-		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
-				getInputList()));
-		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
-				getInputList()));
-
-	}
-
-	@Test
-	public void minMaxByTest() {
-		TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor
-				.getForObject(new Tuple2<Integer, Integer>(0, 0));
-
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MAXBY, true);
-		ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MAXBY, false);
-
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MINBY, true);
-		ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator
-				.getAggregator(0, type1, AggregationType.MINBY, false);
-
-		List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
-
-		List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
-		maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
-
-		List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
-
-		List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
-
-		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
-				getInputList()));
-		assertEquals(maxByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
-				getInputList()));
-		assertEquals(minByLastExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
-				getInputList()));
-		assertEquals(minByFirstExpected, MockContext.createAndExecute(
-				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
-				getInputList()));
-	}
-
-	private List<Tuple2<Integer, Integer>> getInputList() {
-		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>();
-		for (int i = 0; i < 9; i++) {
-			inputList.add(new Tuple2<Integer, Integer>(i % 3, i));
-		}
-		return inputList;
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
deleted file mode 100644
index 6ad827a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class IterateTest {
-
-	private static final long MEMORYSIZE = 32;
-	private static boolean iterated = false;
-
-	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
-			if (value) {
-				iterated = true;
-			} else {
-				out.collect(value);
-			}
-
-		}
-
-	}
-
-	public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
-			out.collect(true);
-
-		}
-
-	}
-
-	public static final class MySink implements SinkFunction<Boolean> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Boolean tuple) {
-		}
-
-	}
-
-	@Test
-	public void test() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
-
-		env.setBufferTimeout(10);
-
-		DataStream<Boolean> source = env.fromElements(false, false, false);
-
-		IterativeDataStream<Boolean> iteration = source.iterate(3000);
-
-		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap(
-				new IterationTail());
-
-		iteration.closeWith(increment).addSink(new MySink());
-
-		env.execute();
-
-		assertTrue(iterated);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
deleted file mode 100644
index 2486715..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class OutputSplitterTest {
-
-	private static final long MEMORYSIZE = 32;
-
-	private static ArrayList<Integer> splitterResult1 = new ArrayList<Integer>();
-	private static ArrayList<Integer> splitterResult2 = new ArrayList<Integer>();
-
-
-	private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testOnMergedDataStream() throws Exception {
-		splitterResult1.clear();
-		splitterResult2.clear();
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
-		env.setBufferTimeout(1);
-
-		DataStream<Integer> d1 = env.fromElements(0,2,4,6,8);
-		DataStream<Integer> d2 = env.fromElements(1,3,5,7,9);
-
-		d1 = d1.merge(d2);
-
-		d1.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = 8354166915727490130L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value > 4) {
-					s.add(">");
-				} else {
-					s.add("<");
-				}
-				return s;
-			}
-		}).select(">").addSink(new SinkFunction<Integer>() {
-
-			private static final long serialVersionUID = 5827187510526388104L;
-
-			@Override
-			public void invoke(Integer value) {
-				splitterResult1.add(value);
-			}
-		});
-
-		d1.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = -6822487543355994807L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 3 == 0) {
-					s.add("yes");
-				} else {
-					s.add("no");
-				}
-				return s;
-			}
-		}).select("yes").addSink(new SinkFunction<Integer>() {
-			private static final long serialVersionUID = -2674335071267854599L;
-
-			@Override
-			public void invoke(Integer value) {
-				splitterResult2.add(value);
-			}
-		});
-		env.execute();
-
-		Collections.sort(splitterResult1);
-		Collections.sort(splitterResult2);
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(5,6,7,8,9));
-		assertEquals(expectedSplitterResult, splitterResult1);
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0,3,6,9));
-		assertEquals(expectedSplitterResult, splitterResult2);
-	}
-
-	@Test
-	public void testOnSingleDataStream() throws Exception {
-		splitterResult1.clear();
-		splitterResult2.clear();
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
-		env.setBufferTimeout(1);
-
-		DataStream<Integer> ds = env.fromElements(0,1,2,3,4,5,6,7,8,9);
-
-		ds.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = 2524335410904414121L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 2 == 0) {
-					s.add("even");
-				} else {
-					s.add("odd");
-				}
-				return s;
-			}
-		}).select("even").addSink(new SinkFunction<Integer>() {
-
-			private static final long serialVersionUID = -2995092337537209535L;
-
-			@Override
-			public void invoke(Integer value) {
-				splitterResult1.add(value);
-			}
-		});
-
-		ds.split(new OutputSelector<Integer>() {
-
-			private static final long serialVersionUID = -511693919586034092L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 4 == 0) {
-					s.add("yes");
-				} else {
-					s.add("no");
-				}
-				return s;
-			}
-		}).select("yes").addSink(new SinkFunction<Integer>() {
-
-			private static final long serialVersionUID = -1749077049727705424L;
-
-			@Override
-			public void invoke(Integer value) {
-				splitterResult2.add(value);
-			}
-		});
-		env.execute();
-
-		Collections.sort(splitterResult1);
-		Collections.sort(splitterResult2);
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0,2,4,6,8));
-		assertEquals(expectedSplitterResult, splitterResult1);
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0,4,8));
-		assertEquals(expectedSplitterResult, splitterResult2);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
deleted file mode 100755
index 757f6f6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-public class PrintTest implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private static final long MEMORYSIZE = 32;
-
-	private static final class IdentityMap implements MapFunction<Long, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Long map(Long value) throws Exception {
-			return value;
-		}
-	}
-
-	private static final class FilterAll implements FilterFunction<Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Long value) throws Exception {
-			return true;
-		}
-	}
-
-	@Test
-	public void test() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
-		env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
deleted file mode 100644
index 9be7de6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.source.FromElementsFunction;
-import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
-import org.apache.flink.streaming.util.MockCollector;
-import org.apache.flink.streaming.util.MockSource;
-import org.junit.Test;
-
-public class SourceTest {
-
-	@Test
-	public void fromElementsTest() {
-		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1,
-				2, 3));
-		assertEquals(expectedList, actualList);
-	}
-
-	@Test
-	public void fromCollectionTest() {
-		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(
-				Arrays.asList(1, 2, 3)));
-		assertEquals(expectedList, actualList);
-	}
-
-	@Test
-	public void socketTextStreamTest() throws Exception {
-		List<String> expectedList = Arrays.asList("a", "b", "c");
-		List<String> actualList = new ArrayList<String>();
-
-		byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' };
-
-		Socket socket = mock(Socket.class);
-		when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
-		when(socket.isClosed()).thenReturn(false);
-		when(socket.isConnected()).thenReturn(true);
-
-		new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>(
-				actualList), socket);
-		assertEquals(expectedList, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
deleted file mode 100644
index 3da6b5f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-public class WindowCrossJoinTest implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final long MEMORYSIZE = 32;
-
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-
-	@Test
-	public void test() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
-		env.setBufferTimeout(1);
-
-		ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
-		ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>();
-
-		in1.add(new Tuple2<Integer, String>(10, "a"));
-		in1.add(new Tuple2<Integer, String>(20, "b"));
-		in1.add(new Tuple2<Integer, String>(20, "x"));
-		in1.add(new Tuple2<Integer, String>(0, "y"));
-
-		in2.add(new Tuple1<Integer>(0));
-		in2.add(new Tuple1<Integer>(5));
-		in2.add(new Tuple1<Integer>(20));
-
-		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "b"), 20));
-		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "x"), 20));
-		joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(0, "y"), 0));
-
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(10, "a"), 0));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(10, "a"), 5));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(10, "a"), 20));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "b"), 0));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "b"), 5));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "b"), 20));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "x"), 0));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "x"), 5));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(20, "x"), 20));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(0, "y"), 0));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(0, "y"), 5));
-		crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(
-				new Tuple2<Integer, String>(0, "y"), 20));
-
-		DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
-		DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
-
-		inStream1
-				.join(inStream2)
-				.onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
-						new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
-				.addSink(new JoinResultSink());
-
-		inStream1
-				.cross(inStream2)
-				.onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
-						new MyTimestamp<Tuple1<Integer>>(), 100)
-				.with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> cross(
-							Tuple2<Integer, String> val1, Tuple1<Integer> val2) throws Exception {
-						return new Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>(val1, val2);
-					}
-				}).addSink(new CrossResultSink());
-
-		env.execute();
-
-		assertEquals(joinExpectedResults, joinResults);
-		assertEquals(crossExpectedResults, crossResults);
-	}
-
-	private static class MyTimestamp<T> implements Timestamp<T> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(T value) {
-			return 101L;
-		}
-	}
-
-	private static class JoinResultSink implements
-			SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
-			joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
-		}
-	}
-
-	private static class CrossResultSink implements
-			SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
-			crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
deleted file mode 100644
index 78cbbe5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class DirectedOutputTest {
-
-	private static final String TEN = "ten";
-	private static final String ODD = "odd";
-	private static final String ALL = "all";
-	private static final String EVEN_AND_ODD = "evenAndOdd";
-	private static final String ODD_AND_TEN = "oddAndTen";
-	private static final String EVEN = "even";
-	private static final String NON_SELECTED = "nonSelected";
-
-	static final class MyOutputSelector implements OutputSelector<Long> {
-		private static final long serialVersionUID = 1L;
-
-		List<String> outputs = new ArrayList<String>();
-
-		@Override
-		public Iterable<String> select(Long value) {
-			outputs.clear();
-			if (value % 2 == 0) {
-				outputs.add(EVEN);
-			} else {
-				outputs.add(ODD);
-			}
-
-			if (value == 10L) {
-				outputs.add(TEN);
-			}
-
-			if (value == 11L) {
-				outputs.add(NON_SELECTED);
-			}
-			return outputs;
-		}
-	}
-
-	static final class ListSink implements SinkFunction<Long> {
-		private static final long serialVersionUID = 1L;
-
-		private String name;
-		private transient List<Long> list;
-
-		public ListSink(String name) {
-			this.name = name;
-		}
-
-		@Override
-		public void invoke(Long value) {
-			list.add(value);
-		}
-
-		private void readObject(java.io.ObjectInputStream in) throws IOException,
-				ClassNotFoundException {
-			in.defaultReadObject();
-			outputs.put(name, new ArrayList<Long>());
-			this.list = outputs.get(name);
-		}
-	}
-
-	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
-
-	@Test
-	public void outputSelectorTest() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, 128);
-
-		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
-		source.select(EVEN).addSink(new ListSink(EVEN));
-		source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
-		source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
-		source.addSink(new ListSink(ALL));
-
-		env.execute();
-		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
-		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
-				outputs.get(EVEN_AND_ODD));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
deleted file mode 100755
index 1615a45..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.junit.Test;
-
-public class OutputSelectorTest {
-
-	static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Iterable<String> select(Tuple1<Integer> tuple) {
-
-			String[] outputs = new String[tuple.f0];
-
-			for (Integer i = 0; i < tuple.f0; i++) {
-				outputs[i] = i.toString();
-			}
-			return Arrays.asList(outputs);
-		}
-	}
-
-	@Test
-	public void testGetOutputs() {
-		OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
-		List<String> expectedOutputs = new ArrayList<String>();
-		expectedOutputs.add("0");
-		expectedOutputs.add("1");
-		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2)));
-		expectedOutputs.add("2");
-		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3)));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
deleted file mode 100755
index 49b3bf8..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
-import org.apache.flink.streaming.util.MockRecordWriterFactory;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class StreamCollectorTest {
-
-	@Test
-	public void testCollect() {
-		MockRecordWriter recWriter = MockRecordWriterFactory.create();
-		SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(
-				null);
-		sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
-
-		Collector<Tuple1<Integer>> collector = new StreamOutput<Tuple1<Integer>>(recWriter, 2, sd);
-		collector.collect(new Tuple1<Integer>(3));
-		collector.collect(new Tuple1<Integer>(4));
-		collector.collect(new Tuple1<Integer>(5));
-		collector.collect(new Tuple1<Integer>(6));
-
-		assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, recWriter.emittedRecords.toArray());
-	}
-
-	@Test
-	public void testClose() {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
deleted file mode 100644
index a91bd0c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoFlatMapTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(String value, Collector<String> coll) {
-			for (int i = 0; i < value.length(); i++) {
-				coll.collect(value.substring(i, i + 1));
-			}
-		}
-
-		@Override
-		public void flatMap2(Integer value, Collector<String> coll) {
-			coll.collect(value.toString());
-		}
-	}
-
-	@Test
-	public void coFlatMapTest() {
-		CoFlatMapInvokable<String, Integer, String> invokable = new CoFlatMapInvokable<String, Integer, String>(
-				new MyCoFlatMap());
-
-		List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
-				"e", "3", "4", "5");
-		List<String> actualList = MockCoContext.createAndExecute(invokable,
-				Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
-
-		assertEquals(expectedList, actualList);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void multipleInputTest() {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
-		DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
-		
-		try {
-			ds1.forward().merge(ds2);
-			fail();
-		} catch (RuntimeException e) {
-			// expected
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
deleted file mode 100644
index 273bbae..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoGroupedReduceTest {
-
-	private final static class MyCoReduceFunction implements
-			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
-				Tuple3<String, String, String> value2) {
-			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
-		}
-
-		@Override
-		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
-				Tuple2<Integer, Integer> value2) {
-			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
-		}
-
-		@Override
-		public String map1(Tuple3<String, String, String> value) {
-			return value.f1;
-		}
-
-		@Override
-		public String map2(Tuple2<Integer, Integer> value) {
-			return value.f1.toString();
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void coGroupedReduceTest() {
-		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
-		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
-		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
-		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
-		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
-		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
-		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
-		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
-		KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Tuple3<String, String, String> value) throws Exception {
-				return value.f0;
-			}
-		};
-
-		KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
-				return value.f0;
-			}
-		};
-
-		KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Tuple3<String, String, String> value) throws Exception {
-				return value.f2;
-			}
-		};
-
-		CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), keySelector0, keySelector1);
-
-		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
-				"7");
-
-		List<String> actualList = MockCoContext.createAndExecute(invokable,
-				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-
-		assertEquals(expected, actualList);
-
-		invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), keySelector2, keySelector1);
-
-		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
-
-		actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3),
-				Arrays.asList(int1, int2, int3, int4, int5));
-
-		assertEquals(expected, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
deleted file mode 100644
index 93d1741..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoMapTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map1(Double value) {
-			return value.toString();
-		}
-
-		@Override
-		public String map2(Integer value) {
-			return value.toString();
-		}
-	}
-
-	@Test
-	public void coMapTest() {
-		CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap());
-
-		List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
-		List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
-		
-		assertEquals(expectedList, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
deleted file mode 100755
index 3343ba0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoStreamReduceTest {
-
-	public static class MyCoReduceFunction implements
-			CoReduceFunction<Integer, String, Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce1(Integer value1, Integer value2) {
-			return value1 * value2;
-		}
-
-		@Override
-		public String reduce2(String value1, String value2) {
-			return value1 + value2;
-		}
-
-		@Override
-		public Integer map1(Integer value) {
-			return value;
-		}
-
-		@Override
-		public Integer map2(String value) {
-			return Integer.parseInt(value);
-		}
-
-	}
-
-	@Test
-	public void coStreamReduceTest() {
-
-		CoReduceInvokable<Integer, String, Integer> coReduce = new CoReduceInvokable<Integer, String, Integer>(
-				new MyCoReduceFunction());
-
-		List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
-		List<Integer> result = MockCoContext.createAndExecute(coReduce,
-				Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8"));
-
-		assertEquals(expected1, result);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
deleted file mode 100644
index 4ab3492..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoWindowTest {
-
-	public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@SuppressWarnings("unused")
-		@Override
-		public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out)
-				throws Exception {
-			Integer count1 = 0;
-			for (Integer i : first) {
-				count1++;
-			}
-			Integer count2 = 0;
-			for (Integer i : second) {
-				count2++;
-			}
-			out.collect(count1);
-			out.collect(count2);
-
-		}
-
-	}
-
-	public static final class MyCoGroup2 implements
-			CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coWindow(List<Tuple2<Integer, Integer>> first,
-				List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception {
-
-			Set<Integer> firstElements = new HashSet<Integer>();
-			for (Tuple2<Integer, Integer> value : first) {
-				firstElements.add(value.f1);
-			}
-			for (Tuple2<Integer, Integer> value : second) {
-				if (firstElements.contains(value.f1)) {
-					out.collect(value.f1);
-				}
-			}
-
-		}
-
-	}
-
-	private static final class MyTS1 implements Timestamp<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Integer value) {
-			return value;
-		}
-
-	}
-
-	private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Tuple2<Integer, Integer> value) {
-			return value.f0;
-		}
-
-	}
-
-	@Test
-	public void coWindowGroupReduceTest2() throws Exception {
-
-		CoWindowInvokable<Integer, Integer, Integer> invokable1 = new CoWindowInvokable<Integer, Integer, Integer>(
-				new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
-				new TimestampWrapper<Integer>(new MyTS1(), 1));
-
-		// Windowsize 2, slide 1
-		// 1,2|2,3|3,4|4,5
-
-		List<Integer> input11 = new ArrayList<Integer>();
-		input11.add(1);
-		input11.add(1);
-		input11.add(2);
-		input11.add(3);
-		input11.add(3);
-
-		List<Integer> input12 = new ArrayList<Integer>();
-		input12.add(1);
-		input12.add(2);
-		input12.add(3);
-		input12.add(3);
-		input12.add(5);
-
-		// Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
-		// expected output: 3,2|3,3|2,2|0,1
-
-		List<Integer> expected1 = new ArrayList<Integer>();
-		expected1.add(3);
-		expected1.add(2);
-		expected1.add(3);
-		expected1.add(3);
-		expected1.add(2);
-		expected1.add(2);
-		expected1.add(0);
-		expected1.add(1);
-
-		List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
-		assertEquals(expected1, actual1);
-
-		CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
-				new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
-						1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
-
-		// WindowSize 2, slide 3
-		// 1,2|4,5|7,8|
-
-		List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>();
-		input21.add(new Tuple2<Integer, Integer>(1, 1));
-		input21.add(new Tuple2<Integer, Integer>(1, 2));
-		input21.add(new Tuple2<Integer, Integer>(2, 3));
-		input21.add(new Tuple2<Integer, Integer>(3, 4));
-		input21.add(new Tuple2<Integer, Integer>(3, 5));
-		input21.add(new Tuple2<Integer, Integer>(4, 6));
-		input21.add(new Tuple2<Integer, Integer>(4, 7));
-		input21.add(new Tuple2<Integer, Integer>(5, 8));
-
-		List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>();
-		input22.add(new Tuple2<Integer, Integer>(1, 1));
-		input22.add(new Tuple2<Integer, Integer>(2, 0));
-		input22.add(new Tuple2<Integer, Integer>(2, 2));
-		input22.add(new Tuple2<Integer, Integer>(3, 9));
-		input22.add(new Tuple2<Integer, Integer>(3, 4));
-		input22.add(new Tuple2<Integer, Integer>(4, 10));
-		input22.add(new Tuple2<Integer, Integer>(5, 8));
-		input22.add(new Tuple2<Integer, Integer>(5, 7));
-
-		List<Integer> expected2 = new ArrayList<Integer>();
-		expected2.add(1);
-		expected2.add(2);
-		expected2.add(8);
-		expected2.add(7);
-
-		List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
-		assertEquals(expected2, actual2);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
deleted file mode 100644
index 969a06b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class CounterInvokableTest {
-
-	@Test
-	public void counterTest() {
-		CounterInvokable<String> invokable = new CounterInvokable<String>();
-
-		List<Long> expected = Arrays.asList(1L, 2L, 3L);
-		List<Long> actual = MockContext.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
-		
-		assertEquals(expected, actual);
-	}
-}


Mime
View raw message