flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [47/51] [abbrv] git commit: [streaming] Updated operator test to avoid environment execution
Date Mon, 18 Aug 2014 17:26:24 GMT
[streaming] Updated operator test to avoid environment execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fc46d4c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fc46d4c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fc46d4c9

Branch: refs/heads/master
Commit: fc46d4c9123defcdb03324a464cb818d705d6cdf
Parents: 696bce0
Author: ghermann <reckoner42@gmail.com>
Authored: Fri Aug 8 14:42:53 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:23:43 2014 +0200

----------------------------------------------------------------------
 .../connectors/twitter/TwitterLocal.java        |   2 +-
 .../streaming/api/datastream/DataStream.java    |  12 +-
 .../environment/StreamExecutionEnvironment.java |   3 +-
 .../flink/streaming/util/MockCollector.java     |  41 +++
 .../flink/streaming/util/MockInvokable.java     | 106 ++++++
 .../apache/flink/streaming/util/MockSource.java |  38 +++
 .../apache/flink/streaming/api/SourceTest.java  |  54 +++
 .../api/invokable/operator/BatchReduceTest.java | 179 +++-------
 .../api/invokable/operator/FilterTest.java      |  38 +--
 .../api/invokable/operator/FlatMapTest.java     | 192 +----------
 .../api/invokable/operator/MapTest.java         | 327 +------------------
 11 files changed, 323 insertions(+), 669 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 668647d..34ddf51 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -99,7 +99,7 @@ public class TwitterLocal implements Serializable {
 
 						return new Tuple2<String, Integer>(value, 1);
 					}
-				}).groupReduce(new WordCountCounter(), 0);
+				}).groupBy(0).reduce(new WordCountCounter());
 
 		dataStream.addSink(new SinkFunction<Tuple2<String, Integer>>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 9f802f7..ab14bc6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -301,10 +301,8 @@ public abstract class DataStream<OUT> {
 	 * @return The transformed {@link DataStream}.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT,
R> reducer,
-			int batchSize) {
-		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
-				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<OUT, R>(reducer,
-				batchSize, batchSize));
+			long batchSize) {
+		return batchReduce(reducer, batchSize, batchSize);
 	}
 
 	/**
@@ -328,7 +326,7 @@ public abstract class DataStream<OUT> {
 	 * @return The transformed {@link DataStream}.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT,
R> reducer,
-			int batchSize, int slideSize) {
+			long batchSize, long slideSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
 				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<OUT, R>(reducer,
 				batchSize, slideSize));
@@ -355,9 +353,7 @@ public abstract class DataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT,
R> reducer,
 			long windowSize) {
-		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
-				GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<OUT, R>(reducer,
-				windowSize, windowSize));
+		return windowReduce(reducer, windowSize, windowSize);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c357424..68e2421 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -220,7 +220,6 @@ public abstract class StreamExecutionEnvironment {
 	 *            type of the returned stream
 	 * @return The DataStream representing the elements.
 	 */
-	@SuppressWarnings("unchecked")
 	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT>
data) {
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
 
@@ -233,7 +232,7 @@ public abstract class StreamExecutionEnvironment {
 
 			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(
 					new FromElementsFunction<OUT>(data)), new ObjectTypeWrapper<OUT, Tuple, OUT>(
-					(OUT) data.toArray()[0], null, (OUT) data.toArray()[0]), "source",
+					data.iterator().next(), null, data.iterator().next()), "source",
 					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize collection");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
new file mode 100644
index 0000000..e200d70
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class MockCollector<T> implements Collector<T> {
+	private Collection<T> outputs;
+	
+	public MockCollector(Collection<T> outputs) {
+		this.outputs = outputs;
+	}
+
+	@Override
+	public void collect(T record) {
+		outputs.add(record);
+	}
+
+	@Override
+	public void close() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
new file mode 100644
index 0000000..91c48e1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockInvokable<IN, OUT> {
+	private Collection<IN> inputs;
+	private List<OUT> outputs;
+
+	private Collector<OUT> collector;
+	private StreamRecordSerializer<IN> inDeserializer;
+	private MutableObjectIterator<StreamRecord<IN>> iterator;
+
+	public MockInvokable(Collection<IN> inputs) {
+		this.inputs = inputs;
+		if (inputs.isEmpty()) {
+			throw new RuntimeException("Inputs must not be empty");
+		}
+
+		TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
+		inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
+		
+		iterator = new MockInputIterator();
+		outputs = new ArrayList<OUT>();
+		collector = new MockCollector<OUT>(outputs);
+	}
+
+
+	private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>>
{
+		Iterator<IN> listIterator;
+		
+		public MockInputIterator() {
+			listIterator = inputs.iterator();
+		}
+
+		@Override
+		public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
+			if (listIterator.hasNext()) {
+				reuse.setObject(listIterator.next());
+			} else {
+				reuse = null;
+			}
+			return reuse;
+		}
+	}
+
+	public List<OUT> getOutputs() {
+		return outputs;
+	}
+
+	public Collector<OUT> getCollector() {
+		return collector;
+	}
+
+	public StreamRecordSerializer<IN> getInDeserializer() {
+		return inDeserializer;
+	}
+
+	public MutableObjectIterator<StreamRecord<IN>> getIterator() {
+		return iterator;
+	}
+
+	public static <IN, OUT> List<OUT> createAndExecute(UserTaskInvokable<IN,
OUT> invokable, List<IN> inputs) {
+		MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
+		invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(),
false);
+		try {
+			invokable.open(null);
+			invokable.invoke();
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot invoke invokable.", e);
+		}
+		
+		return mock.getOutputs();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
new file mode 100644
index 0000000..cbace31
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+
+public class MockSource<T> {
+
+	public static <T> List<T> createAndExecute(SourceFunction<T> source) {
+		List<T> outputs = new ArrayList<T>();
+		try {
+			source.invoke(new MockCollector<T>(outputs));
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot invoke source.", e);
+		}
+		return outputs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
new file mode 100644
index 0000000..750d846
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.source.FromElementsFunction;
+import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.util.MockSource;
+import org.junit.Test;
+
+public class SourceTest {
+	
+	@Test
+	public void fromElementsTest() {
+		List<Integer> expectedList = Arrays.asList(1, 2, 3);
+		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1,
2, 3));
+		assertEquals(expectedList, actualList);
+	}
+
+	@Test
+	public void fromCollectionTest() {
+		List<Integer> expectedList = Arrays.asList(1, 2, 3);
+		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(Arrays.asList(1,
2, 3)));
+		assertEquals(expectedList, actualList);
+	}
+	
+	@Test
+	public void genSequenceTest() {
+		List<Long> expectedList = Arrays.asList(1L, 2L, 3L);
+		List<Long> actualList = MockSource.createAndExecute(new GenSequenceFunction(1, 3));
+		assertEquals(expectedList, actualList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index bf7621e..63ba627 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -22,176 +22,79 @@ package org.apache.flink.streaming.api.invokable.operator;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.util.LogUtils;
+import org.apache.flink.streaming.util.MockInvokable;
 import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
 import org.junit.Test;
 
-public class BatchReduceTest {
-
-	private static ArrayList<Double> avgs = new ArrayList<Double>();
-	private static final int BATCH_SIZE = 5;
-	private static final int PARALLELISM = 1;
-	private static final long MEMORYSIZE = 32;
-
-	public static final class MyBatchReduce implements GroupReduceFunction<Double, Double>
{
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterable<Double> values, Collector<Double> out) throws Exception
{
-
-			Double sum = 0.;
-			Double count = 0.;
-			for (Double value : values) {
-				sum += value;
-				count++;
-			}
-			if (count > 0) {
-				out.collect(new Double(sum / count));
-			}
-		}
-	}
 
-	public static final class MySink implements SinkFunction<Double> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Double tuple) {
-			avgs.add(tuple);
-		}
-
-	}
-
-	public static final class MySource implements SourceFunction<Double> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Collector<Double> collector) {
-			for (Double i = 1.; i <= 100; i++) {
-				collector.collect(new Double(i));
-			}
-		}
-	}
+public class BatchReduceTest {
 
-	public static final class MySlidingBatchReduce implements RichFunction,
-			GroupReduceFunction<Long, String> {
+	public static final class MySlidingBatchReduce implements GroupReduceFunction<Integer,
String> {
 		private static final long serialVersionUID = 1L;
 
-		double startTime;
-
 		@Override
-		public void reduce(Iterable<Long> values, Collector<String> out) throws Exception
{
-			for (Long value : values) {
+		public void reduce(Iterable<Integer> values, Collector<String> out) throws
Exception {
+			for (Integer value : values) {
 				out.collect(value.toString());
 			}
 			out.collect(END_OF_BATCH);
 		}
+	}
 
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			startTime = (double) System.currentTimeMillis() / 1000;
-		}
-
-		@Override
-		public void close() throws Exception {
-		}
+	private final static String END_OF_BATCH = "end of batch";
+	private final static int SLIDING_BATCH_SIZE = 3;
+	private final static int SLIDE_SIZE = 2;
 
-		@Override
-		public RuntimeContext getRuntimeContext() {
-			return null;
-		}
+	@Test
+	public void slidingBatchReduceTest() {
+		BatchReduceInvokable<Integer, String> invokable = new BatchReduceInvokable<Integer,
String>(
+				new MySlidingBatchReduce(), SLIDING_BATCH_SIZE, SLIDE_SIZE);
 
-		@Override
-		public void setRuntimeContext(RuntimeContext t) {
-			// TODO Auto-generated method stub
+		List<String> expected = Arrays.asList("1", "2", "3", END_OF_BATCH, "3", "4", "5",
+				END_OF_BATCH, "5", "6", "7", END_OF_BATCH);
+		List<String> actual = MockInvokable.createAndExecute(invokable,
+				Arrays.asList(1, 2, 3, 4, 5, 6, 7));
 
-		}
+		assertEquals(expected, actual);
 	}
 
-	private static List<SortedSet<String>> sink = new ArrayList<SortedSet<String>>();
-	private static final String END_OF_BATCH = "end of batch";
-
-	public static final class MySlidingSink implements SinkFunction<String> {
-
+	public static final class MyBatchReduce implements GroupReduceFunction<Double, Double>
{
 		private static final long serialVersionUID = 1L;
 
-		SortedSet<String> currentSet = new TreeSet<String>();
-
 		@Override
-		public void invoke(String string) {
-			if (string.equals(END_OF_BATCH)) {
-				sink.add(currentSet);
-				currentSet = new TreeSet<String>();
-			} else {
-				currentSet.add(string);
+		public void reduce(Iterable<Double> values, Collector<Double> out) throws Exception
{
+
+			Double sum = 0.;
+			Double count = 0.;
+			for (Double value : values) {
+				sum += value;
+				count++;
+			}
+			if (count > 0) {
+				out.collect(new Double(sum / count));
 			}
 		}
 	}
-
-	private final static int SLIDING_BATCH_SIZE = 9;
-	private final static int SLIDE_SIZE = 6;
-	private static final int SEQUENCE_SIZE = 30;
-	private LocalStreamEnvironment env;
 	
-	private void slidingStream() {
-		env.generateSequence(1, SEQUENCE_SIZE)
-		.batchReduce(new MySlidingBatchReduce(), SLIDING_BATCH_SIZE, SLIDE_SIZE)
-		.addSink(new MySlidingSink());
-	}
-	
-	private void slidingTest() {
-		int firstInBatch = 1;
+	private static final int BATCH_SIZE = 5;
 
-		for (SortedSet<String> set : sink) {
-			int to = Math.min(firstInBatch + SLIDING_BATCH_SIZE - 1, SEQUENCE_SIZE);
-			assertEquals(getExpectedSet(to), set);
-			firstInBatch += SLIDE_SIZE;
-		}
-	}
-	
-	private void nonSlidingStream() {
-		env.addSource(new MySource()).batchReduce(new MyBatchReduce(), BATCH_SIZE)
-		.addSink(new MySink());
-	}
-	
-	private void nonSlidingTest() {
-		for (int i = 0; i < avgs.size(); i++) {
-			assertEquals(3.0 + i * BATCH_SIZE, avgs.get(i), 0);
-		}
-	}
-	
 	@Test
-	public void test() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-
-		env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM);
-
-		slidingStream();
-		nonSlidingStream();
+	public void nonSlidingBatchReduceTest() {
+		List<Double> inputs = new ArrayList<Double>();
+		for (Double i = 1.; i <= 100; i++) {
+			inputs.add(i);
+		}
 		
-		env.executeTest(MEMORYSIZE);
-
-		slidingTest();
-		nonSlidingTest();
-	}
+		BatchReduceInvokable<Double, Double> invokable = new BatchReduceInvokable<Double,
Double>(new MyBatchReduce(), BATCH_SIZE, BATCH_SIZE);
+		
+		List<Double> avgs = MockInvokable.createAndExecute(invokable, inputs);
 
-	private SortedSet<String> getExpectedSet(int to) {
-		SortedSet<String> expectedSet = new TreeSet<String>();
-		for (int i = to; i > to - SLIDING_BATCH_SIZE; i--) {
-			expectedSet.add(Integer.toString(i));
+		for (int i = 0; i < avgs.size(); i++) {
+			assertEquals(3.0 + i * BATCH_SIZE, avgs.get(i), 0);
 		}
-		return expectedSet;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index ec625e9..152f992 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -19,32 +19,19 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
-import org.junit.Assert;
+import org.apache.flink.streaming.util.MockInvokable;
 import org.junit.Test;
 
 public class FilterTest implements Serializable {
 	private static final long serialVersionUID = 1L;
 
-	private static Set<Integer> set = new HashSet<Integer>();
-
-	private static class MySink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer value) {
-			set.add(value);
-		}
-	}
-
 	static class MyFilter implements FilterFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
@@ -54,16 +41,13 @@ public class FilterTest implements Serializable {
 		}
 	}
 
-	@Test
+	@Test 
 	public void test() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		env.fromElements(1, 2, 3, 4, 5, 6, 7).filter(new MyFilter()).addSink(new MySink());
-
-		env.execute();
+		FilterInvokable<Integer> invokable = new FilterInvokable<Integer>(new MyFilter());
 
-		Assert.assertArrayEquals(new Integer[] { 2, 4, 6 }, set.toArray());
+		List<Integer> expected = Arrays.asList(2, 4, 6);
+		List<Integer> actual = MockInvokable.createAndExecute(invokable, Arrays.asList(1,
2, 3, 4, 5, 6, 7));
+		
+		assertEquals(expected, actual);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index 06f8447..fe367d3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -20,19 +20,13 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
+import org.apache.flink.streaming.util.MockInvokable;
 import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
 import org.junit.Test;
 
 public class FlatMapTest {
@@ -43,180 +37,20 @@ public class FlatMapTest {
 
 		@Override
 		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
-			out.collect(value * value);
-
-		}
-
-	}
-
-	public static final class ParallelFlatMap implements FlatMapFunction<Integer, Integer>
{
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
-			numberOfElements++;
-
-		}
-
-	}
-
-	public static final class GenerateSequenceFlatMap implements FlatMapFunction<Long, Long>
{
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Long value, Collector<Long> out) throws Exception {
-			out.collect(value * value);
-
-		}
-
-	}
-
-	public static final class MySink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			result.add(tuple);
-		}
-
-	}
-
-	public static final class FromElementsSink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			fromElementsResult.add(tuple);
-		}
-
-	}
-
-	public static final class FromCollectionSink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			fromCollectionResult.add(tuple);
-		}
-
-	}
-
-	public static final class GenerateSequenceSink implements SinkFunction<Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Long tuple) {
-			generateSequenceResult.add(tuple);
-		}
-
-	}
-
-	private static void fillExpectedList() {
-		for (int i = 0; i < 10; i++) {
-			expected.add(i * i);
-		}
-	}
-
-	private static void fillFromElementsExpected() {
-		fromElementsExpected.add(4);
-		fromElementsExpected.add(25);
-		fromElementsExpected.add(81);
-	}
-
-	private static void fillSequenceSet() {
-		for (int i = 0; i < 10; i++) {
-			sequenceExpected.add(i * i);
-		}
-	}
-
-	private static void fillLongSequenceSet() {
-		for (int i = 0; i < 10; i++) {
-			sequenceLongExpected.add((long) (i * i));
-		}
-	}
-
-	private static void fillFromCollectionSet() {
-		if (fromCollectionSet.isEmpty()) {
-			for (int i = 0; i < 10; i++) {
-				fromCollectionSet.add(i);
+			if (value % 2 == 0) {
+				out.collect(value);
+				out.collect(value * value);
 			}
 		}
 	}
 
-	private static final int PARALLELISM = 1;
-	private static final long MEMORYSIZE = 32;
-
-	private static int numberOfElements = 0;
-	private static Set<Integer> expected = new HashSet<Integer>();
-	private static Set<Integer> result = new HashSet<Integer>();
-	private static Set<Integer> fromElementsExpected = new HashSet<Integer>();
-	private static Set<Integer> fromElementsResult = new HashSet<Integer>();
-	private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
-	private static Set<Integer> sequenceExpected = new HashSet<Integer>();
-	private static Set<Long> sequenceLongExpected = new HashSet<Long>();
-	private static Set<Integer> fromCollectionResult = new HashSet<Integer>();
-	private static Set<Long> generateSequenceResult = new HashSet<Long>();
-
 	@Test
-	public void test() throws Exception {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM);
-		// flatmapTest
-
-		fillFromCollectionSet();
-
-		@SuppressWarnings("unused")
-		DataStream<Integer> dataStream = env.fromCollection(fromCollectionSet)
-				.flatMap(new MyFlatMap()).addSink(new MySink());
-
-		fillExpectedList();
-
-		// parallelShuffleconnectTest
-		fillFromCollectionSet();
-
-		DataStream<Integer> source = env.fromCollection(fromCollectionSet);
-		@SuppressWarnings("unused")
-		DataStream<Integer> map = source.flatMap(new ParallelFlatMap()).addSink(
-				new MySink());
-		@SuppressWarnings("unused")
-		DataStream<Integer> map2 = source.flatMap(new ParallelFlatMap()).addSink(
-				new MySink());
-
-		// fromElementsTest
-		DataStream<Integer> fromElementsMap = env.fromElements(2, 5, 9).flatMap(
-				new MyFlatMap());
-		@SuppressWarnings("unused")
-		DataStream<Integer> sink = fromElementsMap.addSink(new FromElementsSink());
-
-		fillFromElementsExpected();
-
-		// fromCollectionTest
-		fillFromCollectionSet();
-
-		DataStream<Integer> fromCollectionMap = env.fromCollection(fromCollectionSet)
-				.flatMap(new MyFlatMap());
-		@SuppressWarnings("unused")
-		DataStream<Integer> fromCollectionSink = fromCollectionMap
-				.addSink(new FromCollectionSink());
-
-		// generateSequenceTest
-		fillSequenceSet();
-
-		DataStream<Long> generateSequenceMap = env.generateSequence(0, 9).flatMap(
-				new GenerateSequenceFlatMap());
-		@SuppressWarnings("unused")
-		DataStream<Long> generateSequenceSink = generateSequenceMap
-				.addSink(new GenerateSequenceSink());
-
-		fillLongSequenceSet();
-
-		env.executeTest(MEMORYSIZE);
-
-		assertTrue(expected.equals(result));
-		assertEquals(20, numberOfElements);
-		assertEquals(fromElementsExpected, fromElementsResult);
-		assertEquals(sequenceExpected, fromCollectionResult);
-		assertEquals(sequenceLongExpected, generateSequenceResult);
+	public void flatMapTest() {
+		FlatMapInvokable<Integer, Integer> invokable = new FlatMapInvokable<Integer, Integer>(new
MyFlatMap());
+		
+		List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64);
+		List<Integer> actual = MockInvokable.createAndExecute(invokable, Arrays.asList(1,
2, 3, 4, 5, 6, 7, 8));
+		
+		assertEquals(expected, actual);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fc46d4c9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index 55624d6..e3c7cb7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -20,333 +20,32 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
+import org.apache.flink.streaming.util.MockInvokable;
 import org.junit.Test;
 
 public class MapTest {
 
-	public static final class MySource implements SourceFunction<Integer> {
+	private static class Map implements MapFunction<Integer, String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
-			for (int i = 0; i < 10; i++) {
-				collector.collect(i);
-			}
+		public String map(Integer value) throws Exception {
+			return "+" + (value + 1);
 		}
 	}
-
-	public static final class MySource1 implements SourceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
-			for (int i = 0; i < 5; i++) {
-				collector.collect(i);
-			}
-		}
-	}
-
-	public static final class MySource2 implements SourceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
-			for (int i = 5; i < 10; i++) {
-				collector.collect(i);
-			}
-		}
-	}
-
-	public static final class MySource3 implements SourceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
-			for (int i = 10; i < 15; i++) {
-				collector.collect(new Integer(i));
-			}
-		}
-	}
-
-	public static final class MyMap implements MapFunction<Integer, Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer map(Integer value) throws Exception {
-			map++;
-			return value * value;
-		}
-	}
-
-	public static final class MySingleJoinMap implements MapFunction<Integer, Integer>
{
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer map(Integer value) throws Exception {
-			singleJoinSetResult.add(value);
-			return value;
-		}
-	}
-
-	public static final class MyMultipleJoinMap implements MapFunction<Integer, Integer>
{
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer map(Integer value) throws Exception {
-			multipleJoinSetResult.add(value);
-			return value;
-		}
-	}
-
-	public static final class MyFieldsMap implements MapFunction<Integer, Integer> {
-		private static final long serialVersionUID = 1L;
-
-		private int counter = 0;
-
-		@Override
-		public Integer map(Integer value) throws Exception {
-			counter++;
-			if (counter == MAXSOURCE)
-				allInOne = true;
-			return value * value;
-		}
-	}
-
-	public static final class MyDiffFieldsMap implements MapFunction<Integer, Integer>
{
-		private static final long serialVersionUID = 1L;
-
-		private int counter = 0;
-
-		@Override
-		public Integer map(Integer value) throws Exception {
-			counter++;
-			if (counter > 3)
-				threeInAll = false;
-			return value * value;
-		}
-	}
-
-	public static final class MySink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			result.add(tuple);
-		}
-	}
-
-	public static final class MyBroadcastSink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			broadcastResult++;
-		}
-	}
-
-	public static final class MyShufflesSink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			shuffleResult++;
-		}
-	}
-
-	public static final class MyFieldsSink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			fieldsResult++;
-		}
-	}
-
-	public static final class MyDiffFieldsSink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			diffFieldsResult++;
-		}
-	}
-
-	public static final class MyGraphSink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-			graphResult++;
-		}
-	}
-
-	public static final class JoinSink implements SinkFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Integer tuple) {
-		}
-	}
-
-	private static Set<Integer> expected = new HashSet<Integer>();
-	private static Set<Integer> result = new HashSet<Integer>();
-	private static int broadcastResult = 0;
-	private static int shuffleResult = 0;
-	@SuppressWarnings("unused")
-	private static int fieldsResult = 0;
-	private static int diffFieldsResult = 0;
-	@SuppressWarnings("unused")
-	private static int graphResult = 0;
-	@SuppressWarnings("unused")
-	private static int map = 0;
-	@SuppressWarnings("unused")
-	private static final int PARALLELISM = 1;
-	private static final long MEMORYSIZE = 32;
-	private static final int MAXSOURCE = 10;
-	private static boolean allInOne = false;
-	private static boolean threeInAll = true;
-	private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
-	private static List<Integer> fromCollectionFields = new ArrayList<Integer>();
-	private static Set<Integer> fromCollectionDiffFieldsSet = new HashSet<Integer>();
-	private static Set<Integer> singleJoinSetExpected = new HashSet<Integer>();
-	private static Set<Integer> multipleJoinSetExpected = new HashSet<Integer>();
-	private static Set<Integer> singleJoinSetResult = new HashSet<Integer>();
-	private static Set<Integer> multipleJoinSetResult = new HashSet<Integer>();
-
-	private static void fillExpectedList() {
-		for (int i = 0; i < 10; i++) {
-			expected.add(i * i);
-		}
-	}
-
-	private static void fillFromCollectionSet() {
-		if (fromCollectionSet.isEmpty()) {
-			for (int i = 0; i < 10; i++) {
-				fromCollectionSet.add(i);
-			}
-		}
-	}
-
-	private static void fillFromCollectionFieldsSet() {
-		if (fromCollectionFields.isEmpty()) {
-			for (int i = 0; i < MAXSOURCE; i++) {
-
-				fromCollectionFields.add(5);
-			}
-		}
-	}
-
-	private static void fillFromCollectionDiffFieldsSet() {
-		if (fromCollectionDiffFieldsSet.isEmpty()) {
-			for (int i = 0; i < 9; i++) {
-				fromCollectionDiffFieldsSet.add(i);
-			}
-		}
-	}
-
-	private static void fillSingleJoinSet() {
-		for (int i = 0; i < 10; i++) {
-			singleJoinSetExpected.add(i);
-		}
-	}
-
-	private static void fillMultipleJoinSet() {
-		for (int i = 0; i < 15; i++) {
-			multipleJoinSetExpected.add(i);
-		}
-	}
-
+	
 	@Test
-	public void mapTest() throws Exception {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-		// mapTest
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
-
-		fillFromCollectionSet();
-
-		@SuppressWarnings("unused")
-		DataStream<Integer> dataStream = env.fromCollection(fromCollectionSet).map(new MyMap())
-				.addSink(new MySink());
-
-		fillExpectedList();
-
-		// broadcastSinkTest
-		fillFromCollectionSet();
-
-		@SuppressWarnings("unused")
-		DataStream<Integer> dataStream1 = env.fromCollection(fromCollectionSet).broadcast()
-				.map(new MyMap()).addSink(new MyBroadcastSink());
-
-		// shuffleSinkTest
-		fillFromCollectionSet();
-
-		@SuppressWarnings("unused")
-		DataStream<Integer> dataStream2 = env.fromCollection(fromCollectionSet).map(new MyMap())
-				.setParallelism(3).addSink(new MyShufflesSink());
-
-		// fieldsMapTest
-		fillFromCollectionFieldsSet();
-
-		@SuppressWarnings("unused")
-		DataStream<Integer> dataStream3 = env.fromCollection(fromCollectionFields).partitionBy(0)
-				.map(new MyFieldsMap()).addSink(new MyFieldsSink());
-
-		// diffFieldsMapTest
-		fillFromCollectionDiffFieldsSet();
-
-		@SuppressWarnings("unused")
-		DataStream<Integer> dataStream4 = env.fromCollection(fromCollectionDiffFieldsSet)
-				.partitionBy(0).map(new MyDiffFieldsMap()).addSink(new MyDiffFieldsSink());
-
-		// singleConnectWithTest
-		DataStream<Integer> source1 = env.addSource(new MySource1(), 1);
-
-		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<Integer> source2 = env.addSource(new MySource2(), 1).merge(source1)
-				.partitionBy(0).map(new MySingleJoinMap()).setParallelism(1)
-				.addSink(new JoinSink());
-
-		fillSingleJoinSet();
-
-		// multipleConnectWithTest
-		DataStream<Integer> source3 = env.addSource(new MySource1(), 1);
-
-		DataStream<Integer> source4 = env.addSource(new MySource2(), 1);
-
-		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<Integer> source5 = env.addSource(new MySource3(), 1)
-				.merge(source3, source4).partitionBy(0).map(new MyMultipleJoinMap())
-				.setParallelism(1).addSink(new JoinSink());
-
-		env.executeTest(MEMORYSIZE);
-
-		fillMultipleJoinSet();
-
-		assertTrue(expected.equals(result));
-		assertEquals(30, broadcastResult);
-		assertEquals(10, shuffleResult);
-		assertTrue(allInOne);
-		assertTrue(threeInAll);
-		assertEquals(9, diffFieldsResult);
-		assertEquals(singleJoinSetExpected, singleJoinSetResult);
-		assertEquals(multipleJoinSetExpected, multipleJoinSetResult);
-
+	public void mapInvokableTest() {
+		MapInvokable<Integer, String> invokable = new MapInvokable<Integer, String>(new
Map());
+		
+		List<String> expectedList = Arrays.asList("+2", "+3", "+4");
+		List<String> actualList = MockInvokable.createAndExecute(invokable, Arrays.asList(1,
2, 3));
+		
+		assertEquals(expectedList, actualList);
 	}
-
 }


Mime
View raw message