flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [05/27] incubator-flink git commit: [scala] [streaming] Base functionality added for streaming scala api
Date Sun, 04 Jan 2015 20:50:55 GMT
[scala] [streaming] Base functionality added for streaming scala api


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/34353f66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/34353f66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/34353f66

Branch: refs/heads/master
Commit: 34353f6658e9a4dd50ad860e17eee94804b76ccb
Parents: 87d699d
Author: Gyula Fora <gyfora@apache.org>
Authored: Thu Dec 11 15:22:03 2014 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  37 ++---
 .../flink/streaming/api/StreamConfig.java       |  16 --
 .../api/datastream/ConnectedDataStream.java     |  47 +++---
 .../streaming/api/datastream/DataStream.java    |  70 ++++----
 .../api/datastream/GroupedDataStream.java       |  10 +-
 .../api/datastream/StreamProjection.java        | 161 ++++++++-----------
 .../api/datastream/WindowedDataStream.java      |  14 +-
 .../environment/StreamExecutionEnvironment.java |  37 ++---
 .../function/source/FromElementsFunction.java   |  90 ++++++-----
 .../api/streamvertex/StreamVertex.java          |   3 +-
 flink-scala/pom.xml                             |   6 +
 .../flink/api/scala/streaming/DataStream.scala  |  76 +++++++++
 .../streaming/StreamExecutionEnvironment.scala  | 150 +++++++++++++++++
 13 files changed, 425 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index d66e388..f358de9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -72,7 +72,6 @@ public class JobGraphBuilder {
 	private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
 	private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
 	private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
-	private Map<String, byte[]> serializedFunctions;
 	private Map<String, byte[]> outputSelectors;
 	private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
 	private Map<String, Integer> iterationIds;
@@ -104,7 +103,6 @@ public class JobGraphBuilder {
 		typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>();
 		typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>();
 		typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>();
-		serializedFunctions = new HashMap<String, byte[]>();
 		outputSelectors = new HashMap<String, byte[]>();
 		vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
 		iterationIds = new HashMap<String, Integer>();
@@ -133,18 +131,14 @@ public class JobGraphBuilder {
 	 *            Output type for serialization
 	 * @param operatorName
 	 *            Operator type
-	 * @param serializedFunction
-	 *            Serialized udf
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
 	public <IN, OUT> void addStreamVertex(String vertexName,
 			StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo,
-			TypeInformation<OUT> outTypeInfo, String operatorName, byte[] serializedFunction,
-			int parallelism) {
+			TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) {
 
-		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
-				serializedFunction, parallelism);
+		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism);
 
 		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
 				inTypeInfo) : null;
@@ -171,8 +165,6 @@ public class JobGraphBuilder {
 	 *            Output type for serialization
 	 * @param operatorName
 	 *            Operator type
-	 * @param serializedFunction
-	 *            Serialized udf
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
@@ -185,7 +177,7 @@ public class JobGraphBuilder {
 				function);
 
 		addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName,
-				serializedFunction, parallelism);
+				parallelism);
 	}
 
 	/**
@@ -206,7 +198,7 @@ public class JobGraphBuilder {
 	public void addIterationHead(String vertexName, String iterationHead, Integer iterationID,
 			int parallelism, long waitTime) {
 
-		addVertex(vertexName, StreamIterationHead.class, null, null, null, parallelism);
+		addVertex(vertexName, StreamIterationHead.class, null, null, parallelism);
 
 		iterationIds.put(vertexName, iterationID);
 		iterationIDtoHeadName.put(iterationID, vertexName);
@@ -247,7 +239,7 @@ public class JobGraphBuilder {
 			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
 		}
 
-		addVertex(vertexName, StreamIterationTail.class, null, null, null, parallelism);
+		addVertex(vertexName, StreamIterationTail.class, null, null, parallelism);
 
 		iterationIds.put(vertexName, iterationID);
 		iterationIDtoTailName.put(iterationID, vertexName);
@@ -264,10 +256,9 @@ public class JobGraphBuilder {
 	public <IN1, IN2, OUT> void addCoTask(String vertexName,
 			CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo,
 			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo,
-			String operatorName, byte[] serializedFunction, int parallelism) {
+			String operatorName, int parallelism) {
 
-		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
-				serializedFunction, parallelism);
+		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism);
 
 		addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo),
 				new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>(
@@ -289,20 +280,16 @@ public class JobGraphBuilder {
 	 *            The user defined invokable object
 	 * @param operatorName
 	 *            Type of the user defined operator
-	 * @param serializedFunction
-	 *            Serialized operator
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
 	private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass,
-			StreamInvokable<?, ?> invokableObject, String operatorName, byte[] serializedFunction,
-			int parallelism) {
+			StreamInvokable<?, ?> invokableObject, String operatorName, int parallelism) {
 
 		vertexClasses.put(vertexName, vertexClass);
 		setParallelism(vertexName, parallelism);
 		invokableObjects.put(vertexName, invokableObject);
 		operatorNames.put(vertexName, operatorName);
-		serializedFunctions.put(vertexName, serializedFunction);
 		outEdgeList.put(vertexName, new ArrayList<String>());
 		outEdgeType.put(vertexName, new ArrayList<Integer>());
 		outEdgeNames.put(vertexName, new ArrayList<List<String>>());
@@ -333,8 +320,6 @@ public class JobGraphBuilder {
 		// Get vertex attributes
 		Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName);
 		StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName);
-		String operatorName = operatorNames.get(vertexName);
-		byte[] serializedFunction = serializedFunctions.get(vertexName);
 		int parallelism = vertexParallelism.get(vertexName);
 		byte[] outputSelector = outputSelectors.get(vertexName);
 		Map<String, OperatorState<?>> state = operatorStates.get(vertexName);
@@ -362,7 +347,6 @@ public class JobGraphBuilder {
 		// Set vertex config
 		config.setUserInvokable(invokableObject);
 		config.setVertexName(vertexName);
-		config.setFunction(serializedFunction, operatorName);
 		config.setOutputSelector(outputSelector);
 		config.setOperatorStates(state);
 
@@ -522,8 +506,8 @@ public class JobGraphBuilder {
 	}
 
 	/**
-	 * Sets udf operator and TypeSerializerWrapper from one vertex to another,
-	 * used with some sinks.
+	 * Sets TypeSerializerWrapper from one vertex to another, used with some
+	 * sinks.
 	 * 
 	 * @param from
 	 *            from
@@ -532,7 +516,6 @@ public class JobGraphBuilder {
 	 */
 	public void setBytesFrom(String from, String to) {
 		operatorNames.put(to, operatorNames.get(from));
-		serializedFunctions.put(to, serializedFunctions.get(from));
 
 		typeSerializersIn1.put(to, typeSerializersOut1.get(from));
 		typeSerializersIn2.put(to, typeSerializersOut2.get(from));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 9800b63..8837b85 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -47,7 +47,6 @@ public class StreamConfig {
 	private static final String OUTPUT_SELECTOR = "outputSelector";
 	private static final String DIRECTED_EMIT = "directedEmit";
 	private static final String FUNCTION_NAME = "operatorName";
-	private static final String FUNCTION = "operator";
 	private static final String VERTEX_NAME = "vertexName";
 	private static final String SERIALIZEDUDF = "serializedudf";
 	private static final String USER_FUNCTION = "userfunction";
@@ -173,21 +172,6 @@ public class StreamConfig {
 		return config.getString(VERTEX_NAME, null);
 	}
 
-	public void setFunction(byte[] serializedFunction, String functionName) {
-		if (serializedFunction != null) {
-			config.setBytes(FUNCTION, serializedFunction);
-			config.setString(FUNCTION_NAME, functionName);
-		}
-	}
-
-	public Object getFunction(ClassLoader cl) {
-		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot deserialize invokable object", e);
-		}
-	}
-
 	public String getFunctionName() {
 		return config.getString(FUNCTION_NAME, "");
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index dcc3dab..c663315 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -17,13 +17,9 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.io.Serializable;
 import java.util.List;
 
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
@@ -53,7 +49,7 @@ import org.apache.flink.util.Collector;
  * The ConnectedDataStream represents a stream for two different data types. It
  * can be used to apply transformations like {@link CoMapFunction} on two
  * {@link DataStream}s
- *
+ * 
  * @param <IN1>
  *            Type of the first input data steam.
  * @param <IN2>
@@ -417,8 +413,9 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
 				coMapper.getClass(), 2, null, null);
 
-		return addCoFunction("coMap", clean(coMapper), outTypeInfo,
-				new CoMapInvokable<IN1, IN2, OUT>(clean(coMapper)));
+		return addCoFunction("coMap", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
+				clean(coMapper)));
+
 	}
 
 	/**
@@ -441,8 +438,8 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
 				coFlatMapper.getClass(), 2, null, null);
 
-		return addCoFunction("coFlatMap", clean(coFlatMapper), outTypeInfo,
-				new CoFlatMapInvokable<IN1, IN2, OUT>(clean(coFlatMapper)));
+		return addCoFunction("coFlatMap", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>(
+				clean(coFlatMapper)));
 	}
 
 	/**
@@ -466,8 +463,8 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
 				coReducer.getClass(), 2, null, null);
 
-		return addCoFunction("coReduce", clean(coReducer), outTypeInfo,
-				getReduceInvokable(clean(coReducer)));
+		return addCoFunction("coReduce", outTypeInfo, getReduceInvokable(clean(coReducer)));
+
 	}
 
 	/**
@@ -531,9 +528,9 @@ public class ConnectedDataStream<IN1, IN2> {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
 				coWindowFunction.getClass(), 2, null, null);
 
-		return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo,
-				new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval,
-						timestamp1, timestamp2));
+		return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
+				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
+
 	}
 
 	protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
@@ -607,27 +604,21 @@ public class ConnectedDataStream<IN1, IN2> {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
 
-		return addCoFunction("coWindowReduce", clean(coWindowFunction), outTypeInfo,
-				new CoWindowInvokable<IN1, IN2, OUT>(clean(coWindowFunction), windowSize, slideInterval,
-						timestamp1, timestamp2));
+		return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
+				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
+
 	}
 
-	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
-			final Function function, TypeInformation<OUT> outTypeInfo,
-			CoInvokable<IN1, IN2, OUT> functionInvokable) {
+	public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
+			TypeInformation<OUT> outTypeInfo, CoInvokable<IN1, IN2, OUT> functionInvokable) {
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
 				environment, functionName, outTypeInfo);
 
-		try {
-			dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
-					getInputType1(), getInputType2(), outTypeInfo, functionName,
-					SerializationUtils.serialize((Serializable) function),
-					environment.getDegreeOfParallelism());
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize user defined function");
-		}
+		dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
+				getInputType1(), getInputType2(), outTypeInfo, functionName,
+				environment.getDegreeOfParallelism());
 
 		dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
 		dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 6e8da0a..04929c1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -17,15 +17,11 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -401,8 +397,7 @@ public class DataStream<OUT> {
 
 		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType());
 
-		return addFunction("map", clean(mapper), getType(), outType, new MapInvokable<OUT, R>(
-				clean(mapper)));
+		return transform("map", outType, new MapInvokable<OUT, R>(clean(mapper)));
 	}
 
 	/**
@@ -423,10 +418,11 @@ public class DataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
 
-		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType());
+		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
+				getType());
+
+		return transform("flatMap", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper)));
 
-		return addFunction("flatMap", clean(flatMapper), getType(), outType,
-				new FlatMapInvokable<OUT, R>(clean(flatMapper)));
 	}
 
 	/**
@@ -442,8 +438,8 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
 
-		return addFunction("reduce", clean(reducer), getType(), getType(),
-				new StreamReduceInvokable<OUT>(clean(reducer)));
+		return transform("reduce", getType(), new StreamReduceInvokable<OUT>(clean(reducer)));
+
 	}
 
 	/**
@@ -461,8 +457,8 @@ public class DataStream<OUT> {
 	 * @return The filtered DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
-		return addFunction("filter", clean(filter), getType(), getType(), new FilterInvokable<OUT>(clean(
-				filter)));
+		return transform("filter", getType(), new FilterInvokable<OUT>(clean(filter)));
+
 	}
 
 	/**
@@ -742,7 +738,7 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<Long, ?> count() {
 		TypeInformation<Long> outTypeInfo = TypeExtractor.getForObject(Long.valueOf(0));
 
-		return addFunction("counter", null, getType(), outTypeInfo, new CounterInvokable<OUT>());
+		return transform("counter", outTypeInfo, new CounterInvokable<OUT>());
 	}
 
 	/**
@@ -1120,8 +1116,7 @@ public class DataStream<OUT> {
 
 		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", clean(aggregate),
-				typeInfo, typeInfo, invokable);
+		SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", typeInfo, invokable);
 
 		return returnStream;
 	}
@@ -1137,34 +1132,28 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Internal function for passing the user defined functions to the JobGraph
-	 * of the job.
-	 * 
-	 * @param functionName
-	 *            name of the function
-	 * @param function
-	 *            the user defined function
-	 * @param functionInvokable
-	 *            the wrapping JobVertex instance
+	 * Method for passing user defined invokables along with the type
+	 * informations that will transform the DataStream.
+	 * 
+	 * @param operatorName
+	 *            name of the operator, for logging purposes
+	 * @param outTypeInfo
+	 *            the output type of the operator
+	 * @param invokable
+	 *            the object containing the transformation logic
 	 * @param <R>
 	 *            type of the return stream
 	 * @return the data stream constructed
 	 */
-	protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
-			final Function function, TypeInformation<OUT> inTypeInfo,
-			TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> functionInvokable) {
+	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
+			TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> invokable) {
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
-				functionName, outTypeInfo);
+				operatorName, outTypeInfo);
 
-		try {
-			jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeInfo,
-					outTypeInfo, functionName,
-					SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize user defined function");
-		}
+		jobGraphBuilder.addStreamVertex(returnStream.getId(), invokable, getType(), outTypeInfo,
+				operatorName, degreeOfParallelism);
 
 		connectGraph(inputStream, returnStream.getId(), 0);
 
@@ -1235,13 +1224,8 @@ public class DataStream<OUT> {
 			SinkFunction<OUT> sinkFunction, TypeInformation<OUT> inTypeInfo) {
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo);
 
-		try {
-			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
-					clean(sinkFunction)), inTypeInfo, null, "sink", SerializationUtils
-					.serialize(clean(sinkFunction)), degreeOfParallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize SinkFunction");
-		}
+		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
+				clean(sinkFunction)), inTypeInfo, null, "sink", degreeOfParallelism);
 
 		inputStream.connectGraph(inputStream.copy(), returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 2620d3e..a2c0f89 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.partitioner.StreamPartitioner;
  * partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
  * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream} to
  * get additional functionality by the grouping.
- *
+ * 
  * @param <OUT>
  *            The output type of the {@link GroupedDataStream}.
  */
@@ -62,8 +62,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 */
 	@Override
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return addFunction("groupReduce", clean(reducer), getType(), getType(),
-				new GroupedReduceInvokable<OUT>(clean(reducer), keySelector));
+		return transform("groupReduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer),
+				keySelector));
 	}
 
 	/**
@@ -182,8 +182,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
 				keySelector);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", clean(aggregate),
-				typeInfo, typeInfo, invokable);
+		SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", typeInfo,
+				invokable);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index cc5f66e..e71b18c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -83,8 +83,8 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple1<T0>> outType = (TypeInformation<Tuple1<T0>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
-				new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outType));
+		return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple1<T0>>(
+				fieldIndexes, outType));
 	}
 
 	/**
@@ -111,7 +111,7 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple2<T0, T1>> outType = (TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outType));
 	}
 
@@ -141,7 +141,7 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple3<T0, T1, T2>> outType = (TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType));
 	}
 
@@ -173,7 +173,7 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple4<T0, T1, T2, T3>> outType = (TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType));
 	}
 
@@ -206,14 +206,14 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = (TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType));
 	}
 
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -227,7 +227,7 @@ public class StreamProjection<IN> {
 	 * @param type5
 	 *            The class of field '5' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -243,14 +243,14 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = (TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType));
 	}
 
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -266,7 +266,7 @@ public class StreamProjection<IN> {
 	 * @param type6
 	 *            The class of field '6' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -283,7 +283,7 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = (TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction("projection", null, inTypeInfo, outType,
+				.transform("projection", outType,
 						new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
 								outType));
 	}
@@ -291,7 +291,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -309,7 +309,7 @@ public class StreamProjection<IN> {
 	 * @param type7
 	 *            The class of field '7' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -325,7 +325,7 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType = (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
 						outType));
 	}
@@ -333,7 +333,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -353,7 +353,7 @@ public class StreamProjection<IN> {
 	 * @param type8
 	 *            The class of field '8' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -369,7 +369,7 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes,
 						outType));
 	}
@@ -377,7 +377,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -399,7 +399,7 @@ public class StreamProjection<IN> {
 	 * @param type9
 	 *            The class of field '9' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -415,7 +415,7 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
 						fieldIndexes, outType));
 	}
@@ -423,7 +423,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -447,7 +447,7 @@ public class StreamProjection<IN> {
 	 * @param type10
 	 *            The class of field '10' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -465,7 +465,7 @@ public class StreamProjection<IN> {
 		@SuppressWarnings("unchecked")
 		TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
-		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+		return dataStream.transform("projection", outType,
 				new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
 						fieldIndexes, outType));
 	}
@@ -473,7 +473,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -499,7 +499,7 @@ public class StreamProjection<IN> {
 	 * @param type11
 	 *            The class of field '11' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -518,10 +518,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outType = (TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
 								fieldIndexes, outType));
@@ -530,7 +528,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -558,7 +556,7 @@ public class StreamProjection<IN> {
 	 * @param type12
 	 *            The class of field '12' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -577,10 +575,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outType = (TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
 								fieldIndexes, outType));
@@ -589,7 +585,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -619,7 +615,7 @@ public class StreamProjection<IN> {
 	 * @param type13
 	 *            The class of field '13' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -638,10 +634,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outType = (TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
 								fieldIndexes, outType));
@@ -650,7 +644,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -682,7 +676,7 @@ public class StreamProjection<IN> {
 	 * @param type14
 	 *            The class of field '14' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -702,10 +696,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outType = (TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
 								fieldIndexes, outType));
@@ -714,7 +706,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -748,7 +740,7 @@ public class StreamProjection<IN> {
 	 * @param type15
 	 *            The class of field '15' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -768,10 +760,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outType = (TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
 								fieldIndexes, outType));
@@ -780,7 +770,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -816,7 +806,7 @@ public class StreamProjection<IN> {
 	 * @param type16
 	 *            The class of field '16' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -836,10 +826,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outType = (TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
 								fieldIndexes, outType));
@@ -848,7 +836,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -886,7 +874,7 @@ public class StreamProjection<IN> {
 	 * @param type17
 	 *            The class of field '17' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -906,10 +894,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outType = (TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
 								fieldIndexes, outType));
@@ -918,7 +904,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -958,7 +944,7 @@ public class StreamProjection<IN> {
 	 * @param type18
 	 *            The class of field '18' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -979,10 +965,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outType = (TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
 								fieldIndexes, outType));
@@ -991,7 +975,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -1033,7 +1017,7 @@ public class StreamProjection<IN> {
 	 * @param type19
 	 *            The class of field '19' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -1054,10 +1038,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outType = (TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
 								fieldIndexes, outType));
@@ -1066,7 +1048,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -1110,7 +1092,7 @@ public class StreamProjection<IN> {
 	 * @param type20
 	 *            The class of field '20' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -1132,10 +1114,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outType = (TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
 								fieldIndexes, outType));
@@ -1144,7 +1124,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -1190,7 +1170,7 @@ public class StreamProjection<IN> {
 	 * @param type21
 	 *            The class of field '21' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -1212,10 +1192,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outType = (TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
 								fieldIndexes, outType));
@@ -1224,7 +1202,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -1272,7 +1250,7 @@ public class StreamProjection<IN> {
 	 * @param type22
 	 *            The class of field '22' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -1295,10 +1273,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outType = (TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
 								fieldIndexes, outType));
@@ -1307,7 +1283,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -1357,7 +1333,7 @@ public class StreamProjection<IN> {
 	 * @param type23
 	 *            The class of field '23' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -1380,10 +1356,8 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outType = (TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
 						outType,
 						new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
 								fieldIndexes, outType));
@@ -1392,7 +1366,7 @@ public class StreamProjection<IN> {
 	/**
 	 * Projects a {@link Tuple} {@link DataStream} to the previously selected
 	 * fields. Requires the classes of the fields of the resulting Tuples.
-	 *
+	 * 
 	 * @param type0
 	 *            The class of field '0' of the result Tuples.
 	 * @param type1
@@ -1444,7 +1418,7 @@ public class StreamProjection<IN> {
 	 * @param type24
 	 *            The class of field '24' of the result Tuples.
 	 * @return The projected DataStream.
-	 *
+	 * 
 	 * @see Tuple
 	 * @see DataStream
 	 */
@@ -1467,10 +1441,9 @@ public class StreamProjection<IN> {
 		TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outType = (TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>) extractFieldTypes(
 				fieldIndexes, types, inTypeInfo);
 		return dataStream
-				.addFunction(
+				.transform(
 						"projection",
-						null,
-						inTypeInfo,
+
 						outType,
 						new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
 								fieldIndexes, outType));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 788f28d..cb9cd04 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -50,7 +50,7 @@ import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
  * into windows (predefined chunks). User defined function such as
  * {@link #reduce(ReduceFunction)}, {@link #reduceGroup(GroupReduceFunction)} or
  * aggregations can be applied to the windows.
- *
+ * 
  * @param <OUT>
  *            The output type of the {@link WindowedDataStream}
  */
@@ -124,8 +124,8 @@ public class WindowedDataStream<OUT> {
 		this.userEvicters = windowedDataStream.userEvicters;
 		this.allCentral = windowedDataStream.allCentral;
 	}
-	
-	public <F> F clean(F f){
+
+	public <F> F clean(F f) {
 		return dataStream.clean(f);
 	}
 
@@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
-		return dataStream.addFunction("NextGenWindowReduce", clean(reduceFunction), getType(), getType(),
+		return dataStream.transform("NextGenWindowReduce", getType(),
 				getReduceInvokable(reduceFunction));
 	}
 
@@ -255,7 +255,7 @@ public class WindowedDataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor
 				.getGroupReduceReturnTypes(reduceFunction, inType);
 
-		return dataStream.addFunction("NextGenWindowReduce", clean(reduceFunction), inType, outType,
+		return dataStream.transform("NextGenWindowReduce", outType,
 				getReduceGroupInvokable(reduceFunction));
 	}
 
@@ -457,8 +457,8 @@ public class WindowedDataStream<OUT> {
 	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) {
 		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
-				clean(aggregator), getType(), getType(), invokable);
+		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.transform("windowReduce",
+				getType(), invokable);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index d26b714..f50ab91 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -22,8 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
@@ -264,14 +262,10 @@ public abstract class StreamExecutionEnvironment {
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
 				outTypeInfo);
 
-		try {
-			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
-			jobGraphBuilder.addStreamVertex(returnStream.getId(),
-					new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
-					SerializationUtils.serialize(function), 1);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize elements");
-		}
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
+				null, outTypeInfo, "source", 1);
+
 		return returnStream;
 	}
 
@@ -300,15 +294,8 @@ public abstract class StreamExecutionEnvironment {
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "collection",
 				outTypeInfo);
 
-		try {
-			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
-
-			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
-					new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source",
-					SerializationUtils.serialize(function), 1);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize collection");
-		}
+		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
+				new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source", 1);
 
 		return returnStream;
 	}
@@ -317,7 +304,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Creates a new DataStream that contains the strings received infinitely
 	 * from socket. Received strings are decoded by the system's default
 	 * character set.
-	 *
+	 * 
 	 * @param hostname
 	 *            The host name which a server socket bind.
 	 * @param port
@@ -335,7 +322,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Creates a new DataStream that contains the strings received infinitely
 	 * from socket. Received strings are decoded by the system's default
 	 * character set, uses '\n' as delimiter.
-	 *
+	 * 
 	 * @param hostname
 	 *            The host name which a server socket bind.
 	 * @param port
@@ -378,12 +365,8 @@ public abstract class StreamExecutionEnvironment {
 
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
 
-		try {
-			jobGraphBuilder.addSourceVertex(returnStream.getId(), function, null, outTypeInfo,
-					"source", SerializationUtils.serialize(function), getDegreeOfParallelism());
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize SourceFunction");
-		}
+		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
+				null, outTypeInfo, "source", 1);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index cb960dd..8afac75 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -1,45 +1,49 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.flink.util.Collector;
-
-public class FromElementsFunction<T> implements SourceFunction<T> {
-	private static final long serialVersionUID = 1L;
-
-	Iterable<T> iterable;
-
-	public FromElementsFunction(T... elements) {
-		this.iterable = Arrays.asList(elements);
-	}
-
-	public FromElementsFunction(Collection<T> elements) {
-		this.iterable = elements;
-	}
-
-	@Override
-	public void invoke(Collector<T> collector) throws Exception {
-		for (T element : iterable) {
-			collector.collect(element);
-		}
-	}
-
-}
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class FromElementsFunction<T> implements SourceFunction<T> {
+	private static final long serialVersionUID = 1L;
+
+	Iterable<T> iterable;
+
+	public FromElementsFunction(T... elements) {
+		this.iterable = Arrays.asList(elements);
+	}
+
+	public FromElementsFunction(Collection<T> elements) {
+		this.iterable = elements;
+	}
+
+	public FromElementsFunction(Iterable<T> elements) {
+		this.iterable = elements;
+	}
+
+	@Override
+	public void invoke(Collector<T> collector) throws Exception {
+		for (T element : iterable) {
+			collector.collect(element);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 7504efd..d786d6b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -37,7 +37,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	protected int instanceID;
 	protected String name;
 	private static int numVertices = 0;
-	protected Object function;
+
 	protected String functionName;
 
 	private InputHandler<IN> inputHandler;
@@ -72,7 +72,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.name = configuration.getVertexName();
 		this.functionName = configuration.getFunctionName();
-		this.function = configuration.getFunction(userClassLoader);
 		this.states = configuration.getOperatorStates(userClassLoader);
 		this.context = createRuntimeContext(name, this.states);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index b902655..4ccbb98 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -50,6 +50,12 @@ under the License.
 			<artifactId>flink-compiler</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+        
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
 		<dependency>
 			<groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
new file mode 100644
index 0000000..711ce7c
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+
+class DataStream[OUT](javaStream: JavaStream[OUT]) {
+
+  /* This code is originally from the Apache Spark project. */
+  /**
+   * Clean a closure to make it ready to serialized and send to tasks
+   * (removes unreferenced variables in $outer's, updates REPL variables)
+   * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
+   * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
+   * if not.
+   *
+   * @param f the closure to clean
+   * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
+   * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
+   *   serializable
+   */
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+    ClosureCleaner.clean(f, checkSerializable)
+    f
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element of this DataStream.
+   */
+  def map[R: TypeInformation: ClassTag](fun: OUT => R): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    val mapper = new MapFunction[OUT, R] {
+      val cleanFun = clean(fun)
+      def map(in: OUT): R = cleanFun(in)
+    }
+
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[OUT, R](mapper)))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element of this DataStream.
+   */
+  def map[R: TypeInformation: ClassTag](mapper: MapFunction[OUT, R]): DataStream[R] = {
+    if (mapper == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[OUT, R](mapper)))
+  }
+
+  def print() = javaStream.print()
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/34353f66/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
new file mode 100644
index 0000000..df6c561
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.streaming.api.environment.{ StreamExecutionEnvironment => JavaEnv }
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.commons.lang.Validate
+import scala.reflect.ClassTag
+import org.apache.flink.streaming.api.datastream.DataStreamSource
+import org.apache.flink.streaming.api.invokable.SourceInvokable
+import org.apache.flink.streaming.api.function.source.FromElementsFunction
+
+class StreamExecutionEnvironment(javaEnv: JavaEnv) {
+
+  /**
+   * Sets the degree of parallelism (DOP) for operations executed through this environment.
+   * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
+   * x parallel instances. This value can be overridden by specific operations using
+   * [[DataStream.setParallelism]].
+   */
+  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+  }
+
+  /**
+   * Returns the default degree of parallelism for this execution environment. Note that this
+   * value can be overridden by individual operations using [[DataStream.setParallelism]]
+   */
+  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+  def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new DataStream(javaEnv.generateSequence(from, to))
+
+  /**
+   * Creates a new data stream that contains the given elements. The elements must all be of the
+   * same type and must be serializable.
+   *
+   * * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+  }
+
+  /**
+   * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
+   * because the framework may move the elements into the cluster if needed.
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromCollection[T: ClassTag: TypeInformation](
+    data: Seq[T]): DataStream[T] = {
+    Validate.notNull(data, "Data must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+    val returnStream = new DataStreamSource[T](javaEnv,
+      "elements", typeInfo);
+
+    javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(),
+      new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions.asJavaCollection(data))), null, typeInfo,
+      "source", 1);
+    new DataStream(returnStream)
+  }
+
+  def execute() = javaEnv.execute()
+
+}
+
+object StreamExecutionEnvironment {
+
+  /**
+   * Creates an execution environment that represents the context in which the program is
+   * currently executed. If the program is invoked standalone, this method returns a local
+   * execution environment. If the program is invoked from within the command line client
+   * to be submitted to a cluster, this method returns the execution environment of this cluster.
+   */
+  def getExecutionEnvironment: StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
+  }
+
+  /**
+   * Creates a local execution environment. The local execution environment will run the program in
+   * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
+   * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
+   */
+  def createLocalEnvironment(
+    degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends (parts of) the program to
+   * a cluster for execution. Note that all file paths used in the program must be accessible from
+   * the cluster. The execution will use the cluster's default degree of parallelism, unless the
+   * parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]].
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should be executed.
+   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends (parts of) the program
+   * to a cluster for execution. Note that all file paths used in the program must be accessible
+   * from the cluster. The execution will use the specified degree of parallelism.
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should be executed.
+   * @param degreeOfParallelism The degree of parallelism to use during the execution.
+   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(
+    host: String,
+    port: Int,
+    degreeOfParallelism: Int,
+    jarFiles: String*): StreamExecutionEnvironment = {
+    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    new StreamExecutionEnvironment(javaEnv)
+  }
+}
\ No newline at end of file


Mime
View raw message