flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [36/51] [abbrv] git commit: [streaming] Updated API to use RichFunctions
Date Mon, 18 Aug 2014 17:26:13 GMT
[streaming] Updated API to use RichFunctions


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/776bd3f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/776bd3f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/776bd3f6

Branch: refs/heads/master
Commit: 776bd3f6dbfd85db978ff4d5785935a19f43d77b
Parents: fed03a2
Author: gyfora <gyula.fora@gmail.com>
Authored: Sun Aug 3 15:51:46 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:22:50 2014 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  |  2 +-
 .../apache/flink/streaming/api/DataStream.java  | 96 +++++++++++---------
 .../flink/streaming/api/StreamConfig.java       |  8 +-
 .../api/StreamExecutionEnvironment.java         | 52 ++++-------
 .../api/function/co/CoMapFunction.java          |  5 +-
 .../api/function/sink/SinkFunction.java         |  4 +-
 .../api/function/source/SourceFunction.java     |  4 +-
 .../streaming/api/invokable/SinkInvokable.java  | 11 +++
 .../api/invokable/SourceInvokable.java          | 14 ++-
 .../api/invokable/StreamComponentInvokable.java | 15 ++-
 .../operator/BatchReduceInvokable.java          | 13 ++-
 .../api/invokable/operator/FilterInvokable.java | 17 +++-
 .../invokable/operator/FlatMapInvokable.java    | 17 +++-
 .../api/invokable/operator/MapInvokable.java    | 17 +++-
 .../operator/StreamReduceInvokable.java         | 31 ++++++-
 .../operator/WindowReduceInvokable.java         | 13 ++-
 .../invokable/operator/co/CoMapInvokable.java   | 11 +++
 .../api/streamcomponent/CoStreamTask.java       | 12 ++-
 .../api/streamcomponent/StreamSink.java         |  2 +
 .../api/streamcomponent/StreamSource.java       |  2 +
 .../api/streamcomponent/StreamTask.java         |  2 +
 .../util/serialization/FunctionTypeWrapper.java | 10 +-
 .../apache/flink/streaming/api/IterateTest.java | 16 ++--
 .../apache/flink/streaming/api/PrintTest.java   |  2 +-
 .../api/collector/DirectedOutputTest.java       |  4 +-
 .../api/invokable/operator/BatchReduceTest.java | 19 ++--
 .../api/invokable/operator/FilterTest.java      |  4 +-
 .../api/invokable/operator/FlatMapTest.java     |  8 +-
 .../api/invokable/operator/MapTest.java         | 12 +--
 .../streamcomponent/StreamComponentTest.java    | 10 +-
 .../serialization/TypeSerializationTest.java    |  6 +-
 .../examples/basictopology/BasicTopology.java   |  6 +-
 .../examples/cellinfo/CellInfoLocal.java        |  8 +-
 .../examples/function/JSONParseFlatMap.java     |  8 +-
 .../flink/streaming/examples/join/JoinTask.java |  4 +-
 .../ml/IncrementalLearningSkeleton.java         |  8 +-
 .../streaming/examples/ml/IncrementalOLS.java   | 12 +--
 .../examples/window/join/WindowJoinTask.java    |  4 +-
 .../examples/wordcount/WordCountCounter.java    |  4 +-
 .../examples/wordcount/WordCountSplitter.java   |  4 +-
 40 files changed, 299 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 4225cd3..c6f0ef5 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -32,7 +32,7 @@ import com.rabbitmq.client.ConnectionFactory;
 public abstract class RMQSink<IN> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(RMQSource.class);
+	private static final Log LOG = LogFactory.getLog(RMQSink.class);
 
 	private boolean sendAndClose = false;
 	private boolean closeWithoutSend = false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index d0f1294..f17dd1b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -24,11 +24,11 @@ import java.util.List;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
@@ -124,6 +124,14 @@ public class DataStream<T> {
 		this.iterationID = dataStream.iterationID;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
 	}
+	
+
+	/**
+	 * Partitioning strategy on the stream.
+	 */
+	public static enum ConnectionType {
+		SHUFFLE, BROADCAST, FIELD, FORWARD, DISTRIBUTE
+	}
 
 	/**
 	 * Initialize the connection and partitioning among the connected
@@ -133,7 +141,7 @@ public class DataStream<T> {
 		connectIDs = new ArrayList<String>();
 		connectIDs.add(getId());
 		partitioners = new ArrayList<StreamPartitioner<T>>();
-		partitioners.add(new ShufflePartitioner<T>());
+		partitioners.add(new ForwardPartitioner<T>());
 	}
 
 	/**
@@ -203,7 +211,7 @@ public class DataStream<T> {
 
 	/**
 	 * Gives the data transformation(vertex) a user defined name in order to use
-	 * at directed outputs. The {@link OutputSelector} of the input vertex
+	 * with directed outputs. The {@link OutputSelector} of the input vertex
 	 * should use this name for directed emits.
 	 * 
 	 * @param name
@@ -312,7 +320,8 @@ public class DataStream<T> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
-	 * are forwarded to the local subtask of the next component.
+	 * are forwarded to the local subtask of the next component. This is the
+	 * default partitioner setting.
 	 * 
 	 * @return The DataStream with shuffle partitioning set.
 	 */
@@ -342,19 +351,19 @@ public class DataStream<T> {
 
 	/**
 	 * Applies a Map transformation on a {@link DataStream}. The transformation
-	 * calls a {@link MapFunction} for each element of the DataStream. Each
+	 * calls a {@link RichMapFunction} for each element of the DataStream. Each
 	 * MapFunction call returns exactly one element.
 	 * 
 	 * @param mapper
-	 *            The MapFunction that is called for each element of the
+	 *            The RichMapFunction that is called for each element of the
 	 *            DataStream.
 	 * @param <R>
 	 *            output type
 	 * @return The transformed DataStream.
 	 */
-	public <R> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
+	public <R> StreamOperator<T, R> map(RichMapFunction<T, R> mapper) {
 		return addFunction("map", mapper, new FunctionTypeWrapper<T, Tuple, R>(mapper,
-				MapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
+				RichMapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
 	}
 
 	/**
@@ -372,8 +381,8 @@ public class DataStream<T> {
 	 *            {@link CoMapFunction#map2(Tuple)}
 	 * @return The transformed DataStream
 	 */
-	public <T2, R> DataStream<R> coMapWith(
-			CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
+	public <T2, R> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper,
+			DataStream<T2> otherStream) {
 		return addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream),
 				coMapper,
 				new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
@@ -382,81 +391,82 @@ public class DataStream<T> {
 
 	/**
 	 * Applies a FlatMap transformation on a {@link DataStream}. The
-	 * transformation calls a FlatMapFunction for each element of the
-	 * DataStream. Each FlatMapFunction call can return any number of elements
-	 * including none.
+	 * transformation calls a {@link RichFlatMapFunction} for each element of
+	 * the DataStream. Each RichFlatMapFunction call can return any number of
+	 * elements including none.
 	 * 
 	 * @param flatMapper
-	 *            The FlatMapFunction that is called for each element of the
+	 *            The RichFlatMapFunction that is called for each element of the
 	 *            DataStream
 	 * 
 	 * @param <R>
 	 *            output type
 	 * @return The transformed DataStream.
 	 */
-	public <R> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
+	public <R> StreamOperator<T, R> flatMap(RichFlatMapFunction<T, R> flatMapper) {
 		return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<T, Tuple, R>(flatMapper,
-				FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
+				RichFlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
 	}
 
 	/**
 	 * Applies a Filter transformation on a {@link DataStream}. The
-	 * transformation calls a {@link FilterFunction} for each element of the
+	 * transformation calls a {@link RichFilterFunction} for each element of the
 	 * DataStream and retains only those element for which the function returns
 	 * true. Elements for which the function returns false are filtered.
 	 * 
 	 * @param filter
-	 *            The FilterFunction that is called for each element of the
+	 *            The RichFilterFunction that is called for each element of the
 	 *            DataSet.
 	 * @return The filtered DataStream.
 	 */
-	public StreamOperator<T, T> filter(FilterFunction<T> filter) {
+	public StreamOperator<T, T> filter(RichFilterFunction<T> filter) {
 		return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
-				FilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
+				RichFilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
 	}
 
 	/**
 	 * Applies a reduce transformation on preset chunks of the DataStream. The
-	 * transformation calls a {@link GroupReduceFunction} for each tuple batch
-	 * of the predefined size. Each GroupReduceFunction call can return any
-	 * number of elements including none.
+	 * transformation calls a {@link RichGroupReduceFunction} for each tuple
+	 * batch of the predefined size. Each RichGroupReduceFunction call can
+	 * return any number of elements including none.
 	 * 
 	 * 
 	 * @param reducer
-	 *            The GroupReduceFunction that is called for each tuple batch.
+	 *            The RichGroupReduceFunction that is called for each tuple
+	 *            batch.
 	 * @param batchSize
 	 *            The number of tuples grouped together in the batch.
 	 * @param <R>
 	 *            output type
 	 * @return The modified DataStream.
 	 */
-	public <R> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
-			int batchSize) {
+	public <R> StreamOperator<T, R> batchReduce(RichGroupReduceFunction<T, R> reducer, int batchSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
-				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
+				RichGroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
 				batchSize));
 	}
 
 	/**
 	 * Applies a reduce transformation on preset "time" chunks of the
-	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
+	 * DataStream. The transformation calls a {@link RichGroupReduceFunction} on
 	 * records received during the predefined time window. The window shifted
-	 * after each reduce call. Each GroupReduceFunction call can return any
+	 * after each reduce call. Each RichGroupReduceFunction call can return any
 	 * number of elements including none.
 	 * 
 	 * 
 	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
+	 *            The RichGroupReduceFunction that is called for each time
+	 *            window.
 	 * @param windowSize
 	 *            The time window to run the reducer on, in milliseconds.
 	 * @param <R>
 	 *            output type
 	 * @return The modified DataStream.
 	 */
-	public <R> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
+	public <R> StreamOperator<T, R> windowReduce(RichGroupReduceFunction<T, R> reducer,
 			long windowSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
-				GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
+				RichGroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
 				windowSize));
 	}
 
@@ -477,7 +487,7 @@ public class DataStream<T> {
 	 * @return the data stream constructed
 	 */
 	private <R> StreamOperator<T, R> addFunction(String functionName,
-			final AbstractFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
+			final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
 			UserTaskInvokable<T, R> functionInvokable) {
 
 		DataStream<T> inputStream = new DataStream<T>(this);
@@ -500,9 +510,9 @@ public class DataStream<T> {
 		return returnStream;
 	}
 
-	protected <T1, T2, R> DataStream<R> addCoFunction(
-			String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2,
-			final AbstractFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
+	protected <T1, T2, R> DataStream<R> addCoFunction(String functionName,
+			DataStream<T1> inputStream1, DataStream<T2> inputStream2,
+			final AbstractRichFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
 			CoInvokable<T1, T2, R> functionInvokable) {
 
 		DataStream<R> returnStream = new DataStream<R>(environment, functionName);
@@ -535,7 +545,7 @@ public class DataStream<T> {
 	 * @param typeNumber
 	 *            Number of the type (used at co-functions)
 	 */
-	<X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+	private <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
 		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
 			String inputID = inputStream.connectIDs.get(i);
 			StreamPartitioner<X> partitioner = inputStream.partitioners.get(i);
@@ -545,13 +555,13 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Adds the given sink to this environment. Only streams with sinks added
+	 * Adds the given sink to this DataStream. Only streams with sinks added
 	 * will be executed once the {@link StreamExecutionEnvironment#execute()}
 	 * method is called.
 	 * 
 	 * @param sinkFunction
 	 *            The object containing the sink's invoke function.
-	 * @return The modified DataStream.
+	 * @return The closed DataStream.
 	 */
 	public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
 		return addSink(new DataStream<T>(this), sinkFunction);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index a102a00..4572a84 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
@@ -257,13 +257,13 @@ public class StreamConfig {
 		return config.getInteger(INPUT_TYPE + inputNumber, 0);
 	}
 
-	public void setFunctionClass(Class<? extends AbstractFunction> functionClass) {
+	public void setFunctionClass(Class<? extends AbstractRichFunction> functionClass) {
 		config.setClass("functionClass", functionClass);
 	}
 
 	@SuppressWarnings("unchecked")
-	public Class<? extends AbstractFunction> getFunctionClass() {
-		return (Class<? extends AbstractFunction>) config.getClass("functionClass", null);
+	public Class<? extends AbstractRichFunction> getFunctionClass() {
+		return (Class<? extends AbstractRichFunction>) config.getClass("functionClass", null);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index 0e77912..76adf62 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -59,13 +59,6 @@ public abstract class StreamExecutionEnvironment {
 
 	protected JobGraphBuilder jobGraphBuilder;
 
-	/**
-	 * Partitioning strategy on the stream.
-	 */
-	public static enum ConnectionType {
-		SHUFFLE, BROADCAST, FIELD, FORWARD, DISTRIBUTE
-	}
-
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
@@ -113,15 +106,6 @@ public abstract class StreamExecutionEnvironment {
 		this.degreeOfParallelism = degreeOfParallelism;
 	}
 
-	// protected void setMutability(DataStream<?> stream, boolean isMutable) {
-	// jobGraphBuilder.setMutability(stream.getId(), isMutable);
-	// }
-	//
-	// protected void setBufferTimeout(DataStream<?> stream, long bufferTimeout)
-	// {
-	// jobGraphBuilder.setBufferTimeout(stream.getId(), bufferTimeout);
-	// }
-
 	/**
 	 * Sets the number of hardware contexts (CPU cores / threads) used when
 	 * executed in {@link LocalStreamEnvironment}.
@@ -186,17 +170,17 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @param data
 	 *            The collection of elements to create the DataStream from.
-	 * @param <X>
+	 * @param <OUT>
 	 *            type of the returned stream
 	 * @return The DataStream representing the elements.
 	 */
-	public <X extends Serializable> DataStream<X> fromElements(X... data) {
-		DataStream<X> returnStream = new DataStream<X>(this, "elements");
+	public <OUT extends Serializable> DataStream<OUT> fromElements(OUT... data) {
+		DataStream<OUT> returnStream = new DataStream<OUT>(this, "elements");
 
 		try {
-			SourceFunction<X> function = new FromElementsFunction<X>(data);
-			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<X>(function),
-					new ObjectTypeWrapper<X, Tuple, X>(data[0], null, data[0]), "source",
+			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
+					new ObjectTypeWrapper<OUT, Tuple, OUT>(data[0], null, data[0]), "source",
 					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize elements");
@@ -212,25 +196,25 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @param data
 	 *            The collection of elements to create the DataStream from.
-	 * @param <X>
+	 * @param <OUT>
 	 *            type of the returned stream
 	 * @return The DataStream representing the elements.
 	 */
 	@SuppressWarnings("unchecked")
-	public <X extends Serializable> DataStream<X> fromCollection(Collection<X> data) {
-		DataStream<X> returnStream = new DataStream<X>(this, "elements");
+	public <OUT extends Serializable> DataStream<OUT> fromCollection(Collection<OUT> data) {
+		DataStream<OUT> returnStream = new DataStream<OUT>(this, "elements");
 
 		if (data.isEmpty()) {
 			throw new RuntimeException("Collection must not be empty");
 		}
 
 		try {
-			SourceFunction<X> function = new FromElementsFunction<X>(data);
+			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 
 			jobGraphBuilder.addSource(
 					returnStream.getId(),
-					new SourceInvokable<X>(new FromElementsFunction<X>(data)),
-					new ObjectTypeWrapper<X, Tuple, X>((X) data.toArray()[0], null, (X) data
+					new SourceInvokable<OUT>(new FromElementsFunction<OUT>(data)),
+					new ObjectTypeWrapper<OUT, Tuple, OUT>((OUT) data.toArray()[0], null, (OUT) data
 							.toArray()[0]), "source", SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize collection");
@@ -259,16 +243,16 @@ public abstract class StreamExecutionEnvironment {
 	 *            the user defined function
 	 * @param parallelism
 	 *            number of parallel instances of the function
-	 * @param <T>
+	 * @param <OUT>
 	 *            type of the returned stream
 	 * @return the data stream constructed
 	 */
-	public <T> DataStream<T> addSource(SourceFunction<T> function, int parallelism) {
-		DataStream<T> returnStream = new DataStream<T>(this, "source");
+	public <OUT> DataStream<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
+		DataStream<OUT> returnStream = new DataStream<OUT>(this, "source");
 
 		try {
-			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<T>(function),
-					new FunctionTypeWrapper<T, Tuple, T>(function, SourceFunction.class, 0, -1, 0),
+			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
+					new FunctionTypeWrapper<OUT, Tuple, OUT>(function, SourceFunction.class, 0, -1, 0),
 					"source", SerializationUtils.serialize(function), parallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");
@@ -277,7 +261,7 @@ public abstract class StreamExecutionEnvironment {
 		return returnStream;
 	}
 
-	public <T> DataStream<T> addSource(SourceFunction<T> sourceFunction) {
+	public <OUT> DataStream<OUT> addSource(SourceFunction<OUT> sourceFunction) {
 		return addSource(sourceFunction, 1);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
index 6e4d877..8404a80 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.streaming.api.function.co;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 
-public abstract class CoMapFunction<IN1, IN2, OUT> extends AbstractFunction {
+
+public abstract class CoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction {
 	private static final long serialVersionUID = 1L;
 	
 	public abstract OUT map1(IN1 value);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 867c9f8..668837f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -21,9 +21,9 @@ package org.apache.flink.streaming.api.function.sink;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 
-public abstract class SinkFunction<IN> extends AbstractFunction implements Serializable {
+public abstract class SinkFunction<IN> extends AbstractRichFunction implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 01d4dac..44e3387 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -19,10 +19,10 @@
 
 package org.apache.flink.streaming.api.function.source;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.util.Collector;
 
-public abstract class SourceFunction<OUT> extends AbstractFunction {
+public abstract class SourceFunction<OUT> extends AbstractRichFunction {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 92b1ea6..b733362 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.streaming.api.invokable;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
@@ -44,4 +45,14 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
 			sinkFunction.invoke((IN) reuse.getObject());
 		}
 	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		sinkFunction.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		sinkFunction.close();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index c7f0f09..d049bbf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,10 +21,10 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 
-public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implements
-		Serializable {
+public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -40,4 +40,14 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
 	public void invoke() throws Exception {
 		sourceFunction.invoke(collector);
 	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		sourceFunction.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		sourceFunction.close();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index c011284..02ee5fd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -21,9 +21,12 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
-public abstract class StreamComponentInvokable<OUT> implements Serializable {
+public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction implements
+		Serializable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -42,4 +45,14 @@ public abstract class StreamComponentInvokable<OUT> implements Serializable {
 		this.channelID = channelID;
 	}
 
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		System.out.println("Open not implemented");
+	}
+
+	@Override
+	public void close() throws Exception {
+		System.out.println("Close not implemented");
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 46e79de..61ba5a9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -22,14 +22,13 @@ package org.apache.flink.streaming.api.invokable.operator;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 
-public class BatchReduceInvokable<IN, OUT> extends
-		StreamReduceInvokable<IN, OUT> {
+public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private int batchSize;
 
-	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
+	public BatchReduceInvokable(RichGroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
 		this.reducer = reduceFunction;
 		this.batchSize = batchSize;
 	}
@@ -55,7 +54,7 @@ public class BatchReduceInvokable<IN, OUT> extends
 				tupleBatch.add(reuse.getObject());
 				resetReuse();
 			} while (counter < batchSize);
-			reducer.reduce(tupleBatch.iterator(), collector);
+			reducer.reduce(tupleBatch, collector);
 			tupleBatch.clear();
 			counter = 0;
 		}
@@ -64,11 +63,11 @@ public class BatchReduceInvokable<IN, OUT> extends
 
 	@Override
 	protected void mutableInvoke() throws Exception {
-		BatchIterator<IN> userIterator = new CounterIterator();
+		userIterator = new CounterIterator();
 
 		do {
 			if (userIterator.hasNext()) {
-				reducer.reduce(userIterator, collector);
+				reducer.reduce(userIterable, collector);
 				userIterator.reset();
 			}
 		} while (reuse != null);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index f2b2930..56ab680 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -19,16 +19,17 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
 
-	FilterFunction<IN> filterFunction;
+	RichFilterFunction<IN> filterFunction;
 
-	public FilterInvokable(FilterFunction<IN> filterFunction) {
+	public FilterInvokable(RichFilterFunction<IN> filterFunction) {
 		this.filterFunction = filterFunction;
 	}
 
@@ -50,4 +51,14 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 			}
 		}
 	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		filterFunction.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		filterFunction.close();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 11e7853..7796230 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -19,15 +19,16 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private FlatMapFunction<IN, OUT> flatMapper;
+	private RichFlatMapFunction<IN, OUT> flatMapper;
 
-	public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
+	public FlatMapInvokable(RichFlatMapFunction<IN, OUT> flatMapper) {
 		this.flatMapper = flatMapper;
 	}
 
@@ -45,4 +46,14 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 			flatMapper.flatMap(reuse.getObject(), collector);
 		}
 	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		flatMapper.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		flatMapper.close();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 794d765..23fc31e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -19,15 +19,16 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
 public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private MapFunction<IN, OUT> mapper;
+	private RichMapFunction<IN, OUT> mapper;
 
-	public MapInvokable(MapFunction<IN, OUT> mapper) {
+	public MapInvokable(RichMapFunction<IN, OUT> mapper) {
 		this.mapper = mapper;
 	}
 
@@ -45,4 +46,14 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 			collector.collect(mapper.map(reuse.getObject()));
 		}
 	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		mapper.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		mapper.close();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index a574ebc..1a402a1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -19,12 +19,35 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import java.util.Iterator;
+
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
-public abstract class StreamReduceInvokable<IN, OUT> extends
-		UserTaskInvokable<IN, OUT> {
+public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
-	protected GroupReduceFunction<IN, OUT> reducer;
+	protected RichGroupReduceFunction<IN, OUT> reducer;
+	protected BatchIterator<IN> userIterator;
+	protected BatchIterable userIterable;
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		userIterable = new BatchIterable();
+		reducer.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		reducer.close();
+	}
+
+	protected class BatchIterable implements Iterable<IN> {
+
+		@Override
+		public Iterator<IN> iterator() {
+			return userIterator;
+		}
 
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 7710bd8..430a68e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -22,16 +22,15 @@ package org.apache.flink.streaming.api.invokable.operator;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 
-public class WindowReduceInvokable<IN, OUT> extends
-		StreamReduceInvokable<IN, OUT> {
+public class WindowReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private long windowSize;
 	volatile boolean isRunning;
 	boolean window;
 
-	public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+	public WindowReduceInvokable(RichGroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
 		this.reducer = reduceFunction;
 		this.windowSize = windowSize;
 		this.window = true;
@@ -56,7 +55,7 @@ public class WindowReduceInvokable<IN, OUT> extends
 				tupleBatch.add(reuse.getObject());
 				resetReuse();
 			} while (System.currentTimeMillis() - startTime < windowSize);
-			reducer.reduce(tupleBatch.iterator(), collector);
+			reducer.reduce(tupleBatch, collector);
 			tupleBatch.clear();
 			startTime = System.currentTimeMillis();
 		}
@@ -64,11 +63,11 @@ public class WindowReduceInvokable<IN, OUT> extends
 	}
 
 	protected void mutableInvoke() throws Exception {
-		BatchIterator<IN> userIterator = new WindowIterator();
+		userIterator = new WindowIterator();
 
 		do {
 			if (userIterator.hasNext()) {
-				reducer.reduce(userIterator, collector);
+				reducer.reduce(userIterable, collector);
 				userIterator.reset();
 			}
 		} while (reuse != null);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index e899367..ac71b22 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 
 public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
@@ -53,4 +54,14 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
 		} while (!noMoreRecordOnInput1 && !noMoreRecordOnInput2);
 	}
 
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		mapper.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		mapper.close();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index ede30b4..0e03915 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
@@ -82,11 +82,13 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private void setDeserializers(Object function, Class<? extends AbstractFunction> clazz) {
-		TypeInformation<IN1> inputTypeInfo1 = (TypeInformation<IN1>) typeWrapper.getInputTypeInfo1();
+	private void setDeserializers(Object function, Class<? extends AbstractRichFunction> clazz) {
+		TypeInformation<IN1> inputTypeInfo1 = (TypeInformation<IN1>) typeWrapper
+				.getInputTypeInfo1();
 		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
 
-		TypeInformation<IN2> inputTypeInfo2 = (TypeInformation<IN2>) typeWrapper.getInputTypeInfo2();
+		TypeInformation<IN2> inputTypeInfo2 = (TypeInformation<IN2>) typeWrapper
+				.getInputTypeInfo2();
 		inputDeserializer2 = new StreamRecordSerializer(inputTypeInfo2);
 	}
 
@@ -154,7 +156,9 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 			output.initializeSerializers();
 		}
 
+		userInvokable.open(getTaskConfiguration());
 		userInvokable.invoke();
+		userInvokable.close();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 2969e69..df95bda 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -58,7 +58,9 @@ public class StreamSink<IN> extends SingleInputAbstractStreamComponent<IN, IN> {
 			LOG.debug("SINK " + name + " invoked");
 		}
 
+		userInvokable.open(getTaskConfiguration());
 		userInvokable.invoke();
+		userInvokable.close();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("SINK " + name + " invoke finished");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 70b8242..6644d6f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -73,7 +73,9 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 			output.initializeSerializers();
 		}
 
+		userInvokable.open(getTaskConfiguration());
 		userInvokable.invoke();
+		userInvokable.close();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("SOURCE " + name + " invoke finished with instance id " + instanceID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 3a3f9cf..f809dae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -70,7 +70,9 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 			output.initializeSerializers();
 		}
 
+		userInvokable.open(getTaskConfiguration());
 		userInvokable.invoke();
+		userInvokable.close();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
index 02a7554..54471ae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -21,21 +21,21 @@ package org.apache.flink.streaming.util.serialization;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 public class FunctionTypeWrapper<IN1, IN2, OUT> extends
 		TypeSerializerWrapper<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private AbstractFunction function;
-	private Class<? extends AbstractFunction> functionSuperClass;
+	private AbstractRichFunction function;
+	private Class<? extends AbstractRichFunction> functionSuperClass;
 	private int inTypeParameter1;
 	private int inTypeParameter2;
 	private int outTypeParameter;
 
-	public FunctionTypeWrapper(AbstractFunction function,
-			Class<? extends AbstractFunction> functionSuperClass, int inTypeParameter1,
+	public FunctionTypeWrapper(AbstractRichFunction function,
+			Class<? extends AbstractRichFunction> functionSuperClass, int inTypeParameter1,
 			int inTypeParameter2, int outTypeParameter) {
 		this.function = function;
 		this.functionSuperClass = functionSuperClass;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 1ea165f..68403a8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.util.Collector;
@@ -36,8 +36,7 @@ public class IterateTest {
 	private static final long MEMORYSIZE = 32;
 	private static boolean iterated = false;
 
-	public static final class IterationHead extends
-			FlatMapFunction<Boolean, Boolean> {
+	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -53,8 +52,7 @@ public class IterateTest {
 
 	}
 
-	public static final class IterationTail extends
-			FlatMapFunction<Boolean,Boolean> {
+	public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -86,12 +84,12 @@ public class IterateTest {
 		for (int i = 0; i < 100000; i++) {
 			bl.add(false);
 		}
-		DataStream<Boolean> source =  env
-				.fromCollection(bl);
+		DataStream<Boolean> source = env.fromCollection(bl);
 
 		IterativeDataStream<Boolean> iteration = source.iterate();
-				
-		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap(new IterationTail());
+
+		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap(
+				new IterationTail());
 
 		iteration.closeWith(increment).addSink(new MySink());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 438887a..67dce9d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -35,7 +35,7 @@ public class PrintTest {
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
-		env.generateSequence(1, 100000).print();
+		env.generateSequence(1, 10).print();
 		env.executeTest(MEMORYSIZE);
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index e8742de..e0da783 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -25,7 +25,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -38,7 +38,7 @@ public class DirectedOutputTest {
 	static HashSet<Long> evenSet = new HashSet<Long>();
 	static HashSet<Long> oddSet = new HashSet<Long>();
 	
-	private static class PlusTwo extends MapFunction<Long, Long> {
+	private static class PlusTwo extends RichMapFunction<Long, Long> {
 	
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index d8c7213..c23c9a7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -22,19 +22,18 @@ package org.apache.flink.streaming.api.invokable.operator;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.util.LogUtils;
-import org.junit.Test;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 import org.apache.log4j.Level;
+import org.junit.Test;
 
 public class BatchReduceTest {
 
@@ -44,20 +43,20 @@ public class BatchReduceTest {
 	private static final long MEMORYSIZE = 32;
 
 	public static final class MyBatchReduce extends
-			GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
+			RichGroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterator<Tuple1<Double>> values, Collector<Tuple1<Double>> out)
+		public void reduce(Iterable<Tuple1<Double>> values, Collector<Tuple1<Double>> out)
 				throws Exception {
 
 			Double sum = 0.;
 			Double count = 0.;
-			while (values.hasNext()) {
-				sum += values.next().f0;
+			for (Tuple1<Double> value : values) {
+				sum += value.f0;
 				count++;
 			}
-			if(count>0){
+			if (count > 0) {
 				out.collect(new Tuple1<Double>(sum / count));
 			}
 		}
@@ -87,7 +86,7 @@ public class BatchReduceTest {
 	@Test
 	public void test() throws Exception {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-		
+
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALlELISM);
 
 		@SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index 3286ef1..2d4fe7a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
@@ -45,7 +45,7 @@ public class FilterTest implements Serializable {
 		}
 	}
 
-	static class MyFilter extends FilterFunction<Integer> {
+	static class MyFilter extends RichFilterFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index b299407..2a8973c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -37,7 +37,7 @@ import org.junit.Test;
 
 public class FlatMapTest {
 
-	public static final class MyFlatMap extends FlatMapFunction<Integer, Integer> {
+	public static final class MyFlatMap extends RichFlatMapFunction<Integer, Integer> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -49,7 +49,7 @@ public class FlatMapTest {
 
 	}
 
-	public static final class ParallelFlatMap extends FlatMapFunction<Integer, Integer> {
+	public static final class ParallelFlatMap extends RichFlatMapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -60,7 +60,7 @@ public class FlatMapTest {
 
 	}
 
-	public static final class GenerateSequenceFlatMap extends FlatMapFunction<Long, Long> {
+	public static final class GenerateSequenceFlatMap extends RichFlatMapFunction<Long, Long> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index 2a35de5..07478fb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -27,7 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -84,7 +84,7 @@ public class MapTest {
 		}
 	}
 
-	public static final class MyMap extends MapFunction<Integer, Integer> {
+	public static final class MyMap extends RichMapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -94,7 +94,7 @@ public class MapTest {
 		}
 	}
 
-	public static final class MySingleJoinMap extends MapFunction<Integer, Integer> {
+	public static final class MySingleJoinMap extends RichMapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -104,7 +104,7 @@ public class MapTest {
 		}
 	}
 
-	public static final class MyMultipleJoinMap extends MapFunction<Integer, Integer> {
+	public static final class MyMultipleJoinMap extends RichMapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -114,7 +114,7 @@ public class MapTest {
 		}
 	}
 
-	public static final class MyFieldsMap extends MapFunction<Integer, Integer> {
+	public static final class MyFieldsMap extends RichMapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
@@ -128,7 +128,7 @@ public class MapTest {
 		}
 	}
 
-	public static final class MyDiffFieldsMap extends MapFunction<Integer, Integer> {
+	public static final class MyDiffFieldsMap extends RichMapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index 89a2c7c..566acd2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -24,17 +24,17 @@ import static org.junit.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
 
 public class StreamComponentTest {
 
@@ -59,7 +59,7 @@ public class StreamComponentTest {
 		}
 	}
 
-	public static class MyTask extends MapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
+	public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
index 4ca191e..7347f62 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -25,14 +25,14 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.junit.Test;
 
 public class TypeSerializationTest {
 
-	private static class MyMap extends MapFunction<Tuple1<Integer>, Tuple1<String>> {
+	private static class MyMap extends RichMapFunction<Tuple1<Integer>, Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -45,7 +45,7 @@ public class TypeSerializationTest {
 	@Test
 	public void functionTypeSerializationTest() {
 		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser = new FunctionTypeWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>(
-				new MyMap(), MapFunction.class, 0, -1, 1);
+				new MyMap(), RichMapFunction.class, 0, -1, 1);
 
 		byte[] serializedType = SerializationUtils.serialize(ser);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index 0bb722c..04b8ee2 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -19,11 +19,11 @@
 
 package org.apache.flink.streaming.examples.basictopology;
 
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
 public class BasicTopology {
@@ -42,7 +42,7 @@ public class BasicTopology {
 		}
 	}
 
-	public static class BasicMap extends MapFunction<Tuple1<String>, Tuple1<String>> {
+	public static class BasicMap extends RichMapFunction<Tuple1<String>, Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
 		// map to the same tuple

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
index 9c0638d..5704723 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
@@ -21,12 +21,12 @@ package org.apache.flink.streaming.examples.cellinfo;
 
 import java.util.Random;
 
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.util.Collector;
 
 public class CellInfoLocal {
@@ -77,7 +77,7 @@ public class CellInfoLocal {
 	}
 
 	private final static class CellTask extends
-			FlatMapFunction<Tuple4<Boolean, Integer, Long, Integer>, Tuple1<String>> {
+			RichFlatMapFunction<Tuple4<Boolean, Integer, Long, Integer>, Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
 		private WorkerEngineExact engine = new WorkerEngineExact(10, 500,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
index 267d035..9a3672e 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.examples.function;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.commons.json.JSONObject;
 
@@ -34,10 +34,10 @@ import org.apache.sling.commons.json.JSONObject;
  * Type of the returned elements.
  */
 public abstract class JSONParseFlatMap<IN, OUT> extends
-		FlatMapFunction<IN, OUT> {
+		RichFlatMapFunction<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
-	private static final Log LOG = LogFactory.getLog(DataStream.class);
+	private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
 
 	/**
 	 * Get the value of a field in a JSON text.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
index 8287904..c4414d9 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
@@ -22,12 +22,12 @@ package org.apache.flink.streaming.examples.join;
 import java.util.ArrayList;
 import java.util.HashMap;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 
 public class JoinTask extends
-		FlatMapFunction<Tuple3<String, String, Integer>, Tuple3<String, Integer, Integer>> {
+		RichFlatMapFunction<Tuple3<String, String, Integer>, Tuple3<String, Integer, Integer>> {
 	private static final long serialVersionUID = 749913336259789039L;
 
 	private HashMap<String, ArrayList<Integer>> gradeHashmap;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index d855639..05a7eb1 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -19,11 +19,11 @@
 
 package org.apache.flink.streaming.examples.ml;
 
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
 public class IncrementalLearningSkeleton {
@@ -69,7 +69,7 @@ public class IncrementalLearningSkeleton {
 	}
 
 	// Task for building up-to-date partial models on new training data
-	public static class PartialModelBuilder extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+	public static class PartialModelBuilder extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -86,7 +86,7 @@ public class IncrementalLearningSkeleton {
 
 	// Task for performing prediction using the model produced in
 	// batch-processing and the up-to-date partial model
-	public static class Predictor extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+	public static class Predictor extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		Tuple1<Integer> batchModel = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java
index b4c97c3..1b1cfe1 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java
@@ -23,13 +23,13 @@ import java.util.Random;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
 public class IncrementalOLS {
@@ -78,7 +78,7 @@ public class IncrementalOLS {
 	}
 
 	public static class PartialModelBuilder extends
-			MapFunction<Tuple2<Double, Double[]>, Tuple2<Boolean, Double[]>> {
+			RichMapFunction<Tuple2<Double, Double[]>, Tuple2<Boolean, Double[]>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -116,7 +116,7 @@ public class IncrementalOLS {
 	}
 
 	// TODO: How do I know the x for which I have predicted y?
-	public static class Predictor extends MapFunction<Tuple2<Boolean, Double[]>, Tuple1<Double>> {
+	public static class Predictor extends RichMapFunction<Tuple2<Boolean, Double[]>, Tuple1<Double>> {
 		private static final long serialVersionUID = 1L;
 
 		// StreamRecord batchModel = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
index a0ed3a4..bf536f4 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
@@ -23,13 +23,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.util.Collector;
 
 public class WindowJoinTask extends
-		FlatMapFunction<Tuple4<String, String, Integer, Long>, Tuple3<String, Integer, Integer>> {
+		RichFlatMapFunction<Tuple4<String, String, Integer, Long>, Tuple3<String, Integer, Integer>> {
 
 	class SalaryProgress {
 		public SalaryProgress(Integer salary, Long progress) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/776bd3f6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
index 53c23d6..7142b58 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
@@ -22,10 +22,10 @@ package org.apache.flink.streaming.examples.wordcount;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-public class WordCountCounter extends MapFunction<String, Tuple2<String, Integer>> {
+public class WordCountCounter extends RichMapFunction<String, Tuple2<String, Integer>> {
 	private static final long serialVersionUID = 1L;
 
 	private Map<String, Integer> wordCounts = new HashMap<String, Integer>();


Mime
View raw message