flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [5/6] flink git commit: [FLINK-2398][api-breaking] Introduce StreamGraphGenerator
Date Wed, 19 Aug 2015 16:42:14 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 4de368c..6b12013 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,40 +17,41 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.util.List;
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+
+import java.util.Collection;
 
 /**
- * The iterative data stream represents the start of an iteration in a
- * {@link DataStream}.
+ * The iterative data stream represents the start of an iteration in a {@link DataStream}.
  * 
- * @param <IN>
- *            Type of the DataStream
+ * @param <T> Type of the elements in this Stream
  */
-public class IterativeDataStream<IN> extends
-		SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {
-	
-	protected boolean closed = false;
+public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, IterativeDataStream<T>> {
 
-	static Integer iterationCount = 0;
+	// We store these so that we can create a co-iteration if we need to
+	private DataStream<T> originalInput;
+	private long maxWaitTime;
 	
-	protected IterativeDataStream(DataStream<IN> dataStream, long maxWaitTime) {
-		super(dataStream);
+	protected IterativeDataStream(DataStream<T> dataStream, long maxWaitTime) {
+		super(dataStream.getExecutionEnvironment(),
+				new FeedbackTransformation<T>(dataStream.getTransformation(), maxWaitTime));
+		this.originalInput = dataStream;
+		this.maxWaitTime = maxWaitTime;
 		setBufferTimeout(dataStream.environment.getBufferTimeout());
-		iterationID = iterationCount;
-		iterationCount++;
-		iterationWaitTime = maxWaitTime;
 	}
 
 	/**
 	 * Closes the iteration. This method defines the end of the iterative
-	 * program part that will be fed back to the start of the iteration. </br>
-	 * </br>A common usage pattern for streaming iterations is to use output
+	 * program part that will be fed back to the start of the iteration.
+	 *
+	 * <p>
+	 * A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to
 	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
 	 * for more information.
@@ -58,50 +59,30 @@ public class IterativeDataStream<IN> extends
 	 * @param feedbackStream
 	 *            {@link DataStream} that will be used as input to the iteration
 	 *            head.
-	 * @param keepPartitioning
-	 *            If true the feedback partitioning will be kept as it is (not
-	 *            changed to match the input of the iteration head)
+	 *
 	 * @return The feedback stream.
 	 * 
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean keepPartitioning) {
-		
-		if (closed) {
-			throw new IllegalStateException(
-					"An iterative data stream can only be closed once. Use union to close with multiple stream.");
+	public DataStream<T> closeWith(DataStream<T> feedbackStream) {
+
+		Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
+
+		if (!predecessors.contains(this.transformation)) {
+			throw new UnsupportedOperationException(
+					"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
 		}
-		closed = true;
-		
-		streamGraph.addIterationTail((List) iterationTail.unionedStreams, iterationID,
-				keepPartitioning);
 
-		return iterationTail;
-	}
-	
-	/**
-	 * Closes the iteration. This method defines the end of the iterative
-	 * program part that will be fed back to the start of the iteration. </br>
-	 * </br>A common usage pattern for streaming iterations is to use output
-	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
-	 * for more information.
-	 * 
-	 * 
-	 * @param feedbackStream
-	 *            {@link DataStream} that will be used as input to the
-	 *            iteration head.
-	 * @return The feedback stream.
-	 * 
-	 */
-	public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
-		return closeWith(iterationTail,false);
+		((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
+
+		return feedbackStream;
 	}
 
 	/**
 	 * Changes the feedback type of the iteration and allows the user to apply
 	 * co-transformations on the input and feedback stream, as in a
 	 * {@link ConnectedDataStream}.
+	 *
 	 * <p>
 	 * For type safety the user needs to define the feedback type
 	 * 
@@ -109,7 +90,7 @@ public class IterativeDataStream<IN> extends
 	 *            String describing the type information of the feedback stream.
 	 * @return A {@link ConnectedIterativeDataStream}.
 	 */
-	public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(String feedbackTypeString) {
+	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(String feedbackTypeString) {
 		return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
 	}
 
@@ -117,6 +98,7 @@ public class IterativeDataStream<IN> extends
 	 * Changes the feedback type of the iteration and allows the user to apply
 	 * co-transformations on the input and feedback stream, as in a
 	 * {@link ConnectedDataStream}.
+	 *
 	 * <p>
 	 * For type safety the user needs to define the feedback type
 	 * 
@@ -124,7 +106,7 @@ public class IterativeDataStream<IN> extends
 	 *            Class of the elements in the feedback stream.
 	 * @return A {@link ConnectedIterativeDataStream}.
 	 */
-	public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(Class<F> feedbackTypeClass) {
+	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
 		return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
 	}
 
@@ -132,6 +114,7 @@ public class IterativeDataStream<IN> extends
 	 * Changes the feedback type of the iteration and allows the user to apply
 	 * co-transformations on the input and feedback stream, as in a
 	 * {@link ConnectedDataStream}.
+	 *
 	 * <p>
 	 * For type safety the user needs to define the feedback type
 	 * 
@@ -139,9 +122,8 @@ public class IterativeDataStream<IN> extends
 	 *            The type information of the feedback stream.
 	 * @return A {@link ConnectedIterativeDataStream}.
 	 */
-	public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(TypeInformation<F> feedbackType) {
-		return new ConnectedIterativeDataStream<IN, F>(new IterativeDataStream<IN>(this,
-				iterationWaitTime), feedbackType);
+	public <F> ConnectedIterativeDataStream<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
+		return new ConnectedIterativeDataStream<T, F>(originalInput, feedbackType, maxWaitTime);
 	}
 	
 	/**
@@ -149,6 +131,7 @@ public class IterativeDataStream<IN> extends
 	 * iterative part of a streaming program, where the original input of the
 	 * iteration and the feedback of the iteration are connected as in a
 	 * {@link ConnectedDataStream}.
+	 *
 	 * <p>
 	 * The user can distinguish between the two inputs using co-transformation,
 	 * thus eliminating the need for mapping the inputs and outputs to a common
@@ -161,38 +144,18 @@ public class IterativeDataStream<IN> extends
 	 */
 	public static class ConnectedIterativeDataStream<I, F> extends ConnectedDataStream<I, F>{
 
-		private IterativeDataStream<I> input;
-		private TypeInformation<F> feedbackType;
+		private CoFeedbackTransformation<F> coFeedbackTransformation;
 
-		public ConnectedIterativeDataStream(IterativeDataStream<I> input, TypeInformation<F> feedbackType) {
-			super(input, null);
-			this.input = input;
-			this.feedbackType = feedbackType;
+		public ConnectedIterativeDataStream(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
+			super(input.getExecutionEnvironment(),
+					input,
+					new DataStream<F>(input.getExecutionEnvironment(),
+							new CoFeedbackTransformation<F>(input.getParallelism(),
+									feedbackType,
+									waitTime)));
+			this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecond().getTransformation();
 		}
-		
-		@Override
-		public TypeInformation<F> getType2() {
-			return feedbackType;
-		}
-		
-		@Override
-		public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
-				TypeInformation<OUT> outTypeInfo, TwoInputStreamOperator<I, F, OUT> operator) {
 
-			@SuppressWarnings({ "unchecked", "rawtypes" })
-			SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
-					input.environment, outTypeInfo, operator);
-
-			input.streamGraph.addCoOperator(returnStream.getId(), operator, input.getType(),
-					feedbackType, outTypeInfo, functionName);
-
-			input.connectGraph(input, returnStream.getId(), 1);
-			
-			input.addIterationSource(returnStream, feedbackType);
-
-			return returnStream;
-		}
-		
 		/**
 		 * Closes the iteration. This method defines the end of the iterative
 		 * program part that will be fed back to the start of the iteration as
@@ -206,14 +169,16 @@ public class IterativeDataStream<IN> extends
 		 */
 		@SuppressWarnings({ "rawtypes", "unchecked" })
 		public DataStream<F> closeWith(DataStream<F> feedbackStream) {
-			if (input.closed) {
-				throw new IllegalStateException(
-						"An iterative data stream can only be closed once. Use union to close with multiple stream.");
+
+			Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
+
+			if (!predecessors.contains(this.coFeedbackTransformation)) {
+				throw new UnsupportedOperationException(
+						"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
 			}
-			input.closed = true;
-			
-			input.streamGraph.addIterationTail((List) feedbackStream.unionedStreams,
-					input.iterationID, true);
+
+			coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
+
 			return feedbackStream;
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index b944302..7628815 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -19,7 +19,11 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 /**
@@ -28,11 +32,10 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * are also possible on a KeyedDataStream, with the exception of partitioning methods such as shuffle, forward and groupBy.
  * 
  * 
- * @param <OUT>
- *            The output type of the {@link KeyedDataStream}.
+ * @param <T> The type of the elements in the Keyed Stream
  */
-public class KeyedDataStream<OUT> extends DataStream<OUT> {
-	KeySelector<OUT, ?> keySelector;
+public class KeyedDataStream<T> extends DataStream<T> {
+	KeySelector<T, ?> keySelector;
 
 	/**
 	 * Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
@@ -43,35 +46,35 @@ public class KeyedDataStream<OUT> extends DataStream<OUT> {
 	 * @param keySelector
 	 *            Function for determining state partitions
 	 */
-	public KeyedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, ?> keySelector) {
-		super(dataStream.partitionByHash(keySelector));
+	public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, ?> keySelector) {
+		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
 		this.keySelector = keySelector;
 	}
 
-	protected KeyedDataStream(KeyedDataStream<OUT> dataStream) {
-		super(dataStream);
-		this.keySelector = dataStream.keySelector;
-	}
-
-	public KeySelector<OUT, ?> getKeySelector() {
+	public KeySelector<T, ?> getKeySelector() {
 		return this.keySelector;
 	}
 
 	@Override
-	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
+	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
 		throw new UnsupportedOperationException("Cannot override partitioning for KeyedDataStream.");
 	}
 
 	@Override
-	public KeyedDataStream<OUT> copy() {
-		return new KeyedDataStream<OUT>(this);
-	}
-
-	@Override
 	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
-			TypeInformation<R> outTypeInfo, OneInputStreamOperator<OUT, R> operator) {
+			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
+
 		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
-		streamGraph.setKey(returnStream.getId(), keySelector);
+
+		((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(
+				keySelector);
 		return returnStream;
 	}
+
+	@Override
+	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
+		DataStreamSink<T> result = super.addSink(sinkFunction);
+		result.getTransformation().setStateKeySelector(keySelector);
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index b4a99c8..016cf5e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -24,23 +24,23 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation
  * applied on a {@link DataStream} with one predefined output type.
- * 
- * @param <OUT>
- *            Output type of the operator.
- * @param <O>
- *            Type of the operator.
+ *
+ * @param <T> The type of the elements in this Stream
+ * @param <O> Type of the operator.
  */
-public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperator<OUT, O>> extends
-		DataStream<OUT> {
+public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {
 
-	protected boolean isSplit;
-	protected StreamOperator<?> operator;
+	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
+		super(environment, transformation);
+	}
 
 	/**
 	 * Gets the name of the current data stream. This name is
@@ -48,8 +48,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *
 	 * @return Name of the stream.
 	 */
-	public String getName(){
-		return streamGraph.getStreamNode(getId()).getOperatorName();
+	public String getName() {
+		return transformation.getName();
 	}
 
 	/**
@@ -58,27 +58,11 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *
 	 * @return The named operator.
 	 */
-	public DataStream<OUT> name(String name){
-		streamGraph.getStreamNode(id).setOperatorName(name);
+	public SingleOutputStreamOperator<T, O> name(String name){
+		transformation.setName(name);
 		return this;
 	}
 
-	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
-			TypeInformation<OUT> outTypeInfo, StreamOperator<?> operator) {
-		super(environment, outTypeInfo);
-		this.isSplit = false;
-		this.operator = operator;
-	}
-
-	@SuppressWarnings("unchecked")
-	protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {
-		super(dataStream);
-		if (dataStream instanceof SingleOutputStreamOperator) {
-			this.isSplit = ((SingleOutputStreamOperator<OUT, ?>) dataStream).isSplit;
-			this.operator = ((SingleOutputStreamOperator<OUT, ?>) dataStream).operator;
-		}
-	}
-
 	/**
 	 * Sets the parallelism for this operator. The degree must be 1 or more.
 	 * 
@@ -86,13 +70,12 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            The parallelism for this operator.
 	 * @return The operator with set parallelism.
 	 */
-	public SingleOutputStreamOperator<OUT, O> setParallelism(int parallelism) {
+	public SingleOutputStreamOperator<T, O> setParallelism(int parallelism) {
 		if (parallelism < 1) {
 			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
 		}
-		this.parallelism = parallelism;
 
-		streamGraph.setParallelism(id, parallelism);
+		transformation.setParallelism(parallelism);
 
 		return this;
 	}
@@ -105,39 +88,34 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            The maximum time between two output flushes.
 	 * @return The operator with buffer timeout set.
 	 */
-	public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) {
-		streamGraph.setBufferTimeout(id, timeoutMillis);
+	public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis) {
+		transformation.setBufferTimeout(timeoutMillis);
 		return this;
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> broadcast() {
-		return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
+	public SingleOutputStreamOperator<T, O> broadcast() {
+		return (SingleOutputStreamOperator<T, O>) super.broadcast();
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> shuffle() {
-		return (SingleOutputStreamOperator<OUT, O>) super.shuffle();
+	public SingleOutputStreamOperator<T, O> shuffle() {
+		return (SingleOutputStreamOperator<T, O>) super.shuffle();
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> forward() {
-		return (SingleOutputStreamOperator<OUT, O>) super.forward();
+	public SingleOutputStreamOperator<T, O> forward() {
+		return (SingleOutputStreamOperator<T, O>) super.forward();
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> rebalance() {
-		return (SingleOutputStreamOperator<OUT, O>) super.rebalance();
+	public SingleOutputStreamOperator<T, O> rebalance() {
+		return (SingleOutputStreamOperator<T, O>) super.rebalance();
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> global() {
-		return (SingleOutputStreamOperator<OUT, O>) super.global();
-	}
-
-	@Override
-	public SingleOutputStreamOperator<OUT, O> copy() {
-		return new SingleOutputStreamOperator<OUT, O>(this);
+	public SingleOutputStreamOperator<T, O> global() {
+		return (SingleOutputStreamOperator<T, O>) super.global();
 	}
 
 	/**
@@ -149,8 +127,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            The selected {@link ChainingStrategy}
 	 * @return The operator with the modified chaining strategy
 	 */
-	private SingleOutputStreamOperator<OUT, O> setChainingStrategy(ChainingStrategy strategy) {
-		this.operator.setChainingStrategy(strategy);
+	private SingleOutputStreamOperator<T, O> setChainingStrategy(ChainingStrategy strategy) {
+		this.transformation.setChainingStrategy(strategy);
 		return this;
 	}
 
@@ -162,7 +140,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 * 
 	 * @return The operator with chaining disabled
 	 */
-	public SingleOutputStreamOperator<OUT, O> disableChaining() {
+	public SingleOutputStreamOperator<T, O> disableChaining() {
 		return setChainingStrategy(AbstractStreamOperator.ChainingStrategy.NEVER);
 	}
 
@@ -173,7 +151,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 * 
 	 * @return The operator with chaining set.
 	 */
-	public SingleOutputStreamOperator<OUT, O> startNewChain() {
+	public SingleOutputStreamOperator<T, O> startNewChain() {
 		return setChainingStrategy(AbstractStreamOperator.ChainingStrategy.HEAD);
 	}
 
@@ -216,7 +194,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		if (typeInfoString == null) {
 			throw new IllegalArgumentException("Type information string must not be null.");
 		}
-		return returns(TypeInfoParser.<OUT>parse(typeInfoString));
+		return returns(TypeInfoParser.<T>parse(typeInfoString));
 	}
 	
 	/**
@@ -243,11 +221,11 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            type information as a return type hint
 	 * @return This operator with a given return type hint.
 	 */
-	public O returns(TypeInformation<OUT> typeInfo) {
+	public O returns(TypeInformation<T> typeInfo) {
 		if (typeInfo == null) {
 			throw new IllegalArgumentException("Type information must not be null.");
 		}
-		fillInType(typeInfo);
+		transformation.setOutputType(typeInfo);
 		@SuppressWarnings("unchecked")
 		O returnType = (O) this;
 		return returnType;
@@ -277,13 +255,13 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 * @return This operator with a given return type hint.
 	 */
 	@SuppressWarnings("unchecked")
-	public O returns(Class<OUT> typeClass) {
+	public O returns(Class<T> typeClass) {
 		if (typeClass == null) {
 			throw new IllegalArgumentException("Type class must not be null.");
 		}
 		
 		try {
-			TypeInformation<OUT> ti = (TypeInformation<OUT>) TypeExtractor.createTypeInfo(typeClass);
+			TypeInformation<T> ti = (TypeInformation<T>) TypeExtractor.createTypeInfo(typeClass);
 			return returns(ti);
 		}
 		catch (InvalidTypesException e) {
@@ -291,6 +269,11 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		}
 	}
 
+	@Override
+	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+		return new SingleOutputStreamOperator<T, O>(this.getExecutionEnvironment(), new PartitionTransformation<T>(this.getTransformation(), partitioner));
+	}
+
 	/**
 	 * By default all operators in a streaming job share the same resource
 	 * group. Each resource group takes as many task manager slots as the
@@ -305,8 +288,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 * 
 	 * @return The operator as a part of a new resource group.
 	 */
-	public SingleOutputStreamOperator<OUT, O> startNewResourceGroup() {
-		streamGraph.setResourceStrategy(getId(), ResourceStrategy.NEWGROUP);
+	public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
+		transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
 		return this;
 	}
 
@@ -319,8 +302,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 * 
 	 * @return The operator with isolated resource group.
 	 */
-	public SingleOutputStreamOperator<OUT, O> isolateResources() {
-		streamGraph.setResourceStrategy(getId(), ResourceStrategy.ISOLATE);
+	public SingleOutputStreamOperator<T, O> isolateResources() {
+		transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 6b95fe7..bc9ecfb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -17,23 +17,23 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.util.Arrays;
-
+import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.transformations.SelectTransformation;
+import org.apache.flink.streaming.api.transformations.SplitTransformation;
 
 /**
  * The SplitDataStream represents an operator that has been split using an
  * {@link OutputSelector}. Named outputs can be selected using the
  * {@link #select} function. To apply transformation on the whole output simply
  * call the transformation on the SplitDataStream
- * 
- * @param <OUT>
- *            The type of the output.
+ *
+ * @param <OUT> The type of the elements in the Stream
  */
 public class SplitDataStream<OUT> extends DataStream<OUT> {
 
-	protected SplitDataStream(DataStream<OUT> dataStream) {
-		super(dataStream);
+	protected SplitDataStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
+		super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
 	}
 
 	/**
@@ -55,12 +55,8 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
 			}
 		}
 
-		DataStream<OUT> returnStream = copy();
-
-		for (DataStream<OUT> ds : returnStream.unionedStreams) {
-			ds.selectedNames = Arrays.asList(outputNames);
-		}
-		return returnStream;
+		SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
+		return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 9565f4b..bf3a11a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -99,7 +99,7 @@ public class WindowedDataStream<OUT> {
 	protected EvictionPolicy<OUT> userEvicter;
 
 	protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT> policyHelper) {
-		this.dataStream = dataStream.copy();
+		this.dataStream = dataStream;
 		this.triggerHelper = policyHelper;
 
 		if (dataStream instanceof GroupedDataStream) {
@@ -109,7 +109,7 @@ public class WindowedDataStream<OUT> {
 
 	protected WindowedDataStream(DataStream<OUT> dataStream, TriggerPolicy<OUT> trigger,
 			EvictionPolicy<OUT> evicter) {
-		this.dataStream = dataStream.copy();
+		this.dataStream = dataStream;
 
 		this.userTrigger = trigger;
 		this.userEvicter = evicter;
@@ -120,7 +120,7 @@ public class WindowedDataStream<OUT> {
 	}
 
 	protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
-		this.dataStream = windowedDataStream.dataStream.copy();
+		this.dataStream = windowedDataStream.dataStream;
 		this.discretizerKey = windowedDataStream.discretizerKey;
 		this.groupByKey = windowedDataStream.groupByKey;
 		this.triggerHelper = windowedDataStream.triggerHelper;

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
index 3dd02a3..e0aafb7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
@@ -30,10 +30,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.co.CrossWindowFunction;
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
 
 public class StreamCrossOperator<I1, I2> extends
-		TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
+		TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2, Tuple2<I1, I2>>> {
 
 	public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
 		super(input1, input2);
@@ -48,37 +47,42 @@ public class StreamCrossOperator<I1, I2> extends
 	}
 
 	@Override
-	protected CrossWindow<I1, I2> createNextWindowOperator() {
+	protected CrossWindow<I1, I2, Tuple2<I1, I2>> createNextWindowOperator() {
 
 		CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
 				clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
 
-		return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine(
+		return new CrossWindow<I1, I2, Tuple2<I1, I2>>(this, input1.connect(input2).addGeneralWindowCombine(
 				crossWindowFunction,
 				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize,
 				slideInterval, timeStamp1, timeStamp2));
 	}
 
-	public static class CrossWindow<I1, I2> extends
-			SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> implements
-			TemporalWindow<CrossWindow<I1, I2>> {
+	public static class CrossWindow<I1, I2, R> extends
+			SingleOutputStreamOperator<R, CrossWindow<I1, I2, R>> implements
+			TemporalWindow<CrossWindow<I1, I2, R>> {
 
 		private StreamCrossOperator<I1, I2> op;
 
-		public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>> ds) {
-			super(ds);
+		public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<R> ds) {
+			super(ds.getExecutionEnvironment(), ds.getTransformation());
 			this.op = op;
 		}
 
-		public CrossWindow<I1, I2> every(long length, TimeUnit timeUnit) {
+		public CrossWindow<I1, I2, R> every(long length, TimeUnit timeUnit) {
 			return every(timeUnit.toMillis(length));
 		}
 
 		@SuppressWarnings("unchecked")
-		public CrossWindow<I1, I2> every(long length) {
-			((CoStreamWindow<I1, I2, ?>) streamGraph.getStreamNode(id).getOperator())
-					.setSlideSize(length);
-			return this;
+		public CrossWindow<I1, I2, R> every(long length) {
+
+			CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
+					clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
+
+			return (CrossWindow<I1, I2, R>) new CrossWindow<I1, I2, Tuple2<I1, I2>>(op, op.input1.connect(op.input2).addGeneralWindowCombine(
+					crossWindowFunction,
+					new TupleTypeInfo<Tuple2<I1, I2>>(op.input1.getType(), op.input2.getType()), op.windowSize,
+					length, op.timeStamp1, op.timeStamp2));
 		}
 
 		/**
@@ -97,13 +101,12 @@ public class StreamCrossOperator<I1, I2> extends
 			TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
 					op.input1.getType(), op.input2.getType());
 
-			CoStreamWindow<I1, I2, R> operator = new CoStreamWindow<I1, I2, R>(
-					new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize,
-					op.slideInterval, op.timeStamp1, op.timeStamp2);
-
-			streamGraph.setOperator(id, operator);
+			CrossWindowFunction<I1, I2, R> crossWindowFunction = new CrossWindowFunction<I1, I2, R>(clean(function));
 
-			return ((SingleOutputStreamOperator<R, ?>) this).returns(outTypeInfo);
+			return new CrossWindow<I1, I2, R>(op, op.input1.connect(op.input2).addGeneralWindowCombine(
+					crossWindowFunction,
+					outTypeInfo, op.windowSize,
+					op.slideInterval, op.timeStamp1, op.timeStamp2));
 
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
index e18e14b..e48d707 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.co.JoinWindowFunction;
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 
 public class StreamJoinOperator<I1, I2> extends
@@ -156,7 +155,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 * @return A streaming join operator. Call {@link JoinedStream#with} to
 		 *         apply a custom wrapping
 		 */
-		public JoinedStream<I1, I2> equalTo(int... fields) {
+		public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(int... fields) {
 			keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2),
 					type2, op.input1.getExecutionEnvironment().getConfig());
 			return createJoinOperator();
@@ -175,7 +174,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 * @return A streaming join operator. Call {@link JoinedStream#with} to
 		 *         apply a custom wrapping
 		 */
-		public JoinedStream<I1, I2> equalTo(String... fields) {
+		public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(String... fields) {
 			this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields,
 					type2), type2, op.input1.getExecutionEnvironment().getConfig());
 			return createJoinOperator();
@@ -198,12 +197,12 @@ public class StreamJoinOperator<I1, I2> extends
 		 * @return A streaming join operator. Call {@link JoinedStream#with} to
 		 *         apply a custom wrapping
 		 */
-		public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
+		public <K> JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(KeySelector<I2, K> keySelector) {
 			this.keys2 = keySelector;
 			return createJoinOperator();
 		}
 
-		private JoinedStream<I1, I2> createJoinOperator() {
+		private JoinedStream<I1, I2, Tuple2<I1, I2>> createJoinOperator() {
 
 			JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
 
@@ -213,42 +212,44 @@ public class StreamJoinOperator<I1, I2> extends
 			TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
 					op.input1.getType(), op.input2.getType());
 
-			return new JoinedStream<I1, I2>(this, op.input1
+			return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, op.input1
 					.groupBy(keys1)
 					.connect(op.input2.groupBy(keys2))
 					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
 							op.slideInterval, op.timeStamp1, op.timeStamp2));
 		}
-	}
-
-	public static class JoinedStream<I1, I2> extends
-			SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> {
-		private final JoinPredicate<I1, I2> predicate;
-
-		private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1, I2>> ds) {
-			super(ds);
-			this.predicate = predicate;
-		}
-
-		/**
-		 * Completes a stream join. </p> The resulting operator wraps each pair
-		 * of joining elements using the user defined {@link JoinFunction}
-		 * 
-		 * @return The joined data stream.
-		 */
-		@SuppressWarnings("unchecked")
-		public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
-
-			TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
-					predicate.op.input1.getType(), predicate.op.input2.getType());
-
-			CoStreamWindow<I1, I2, OUT> operator = new CoStreamWindow<I1, I2, OUT>(
-					getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize,
-					predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2);
-
-			streamGraph.setOperator(id, operator);
 
-			return ((SingleOutputStreamOperator<OUT, ?>) this).returns(outType);
+		public static class JoinedStream<I1, I2, R> extends
+				SingleOutputStreamOperator<R, JoinedStream<I1, I2, R>> {
+			private final JoinPredicate<I1, I2> predicate;
+
+			private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<R> ds) {
+				super(ds.getExecutionEnvironment(), ds.getTransformation());
+				this.predicate = predicate;
+			}
+
+			/**
+			 * Completes a stream join. </p> The resulting operator wraps each pair
+			 * of joining elements using the user defined {@link JoinFunction}
+			 *
+			 * @return The joined data stream.
+			 */
+			@SuppressWarnings("unchecked")
+			public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
+
+				TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
+						predicate.op.input1.getType(), predicate.op.input2.getType());
+
+				JoinWindowFunction<I1, I2, OUT> joinWindowFunction = getJoinWindowFunction(joinFunction, predicate);
+
+
+				return new JoinedStream<I1, I2, OUT>(
+						predicate, predicate.op.input1
+						.groupBy(predicate.keys1)
+						.connect(predicate.op.input2.groupBy(predicate.keys2))
+						.addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize,
+								predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2));
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
index 3fe7eb7..9da00f2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
@@ -40,8 +40,8 @@ public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
 		if (input1 == null || input2 == null) {
 			throw new NullPointerException();
 		}
-		this.input1 = input1.copy();
-		this.input2 = input2.copy();
+		this.input1 = input1;
+		this.input2 = input2;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 3efad93..58459b7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -43,9 +43,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		JobExecutionResult result = ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(),
-				getConfig().isSysoutLoggingEnabled());
-		streamGraph.clear(); // clear graph to allow submitting another job via the same environment.
+		JobExecutionResult result = ClusterUtil.runOnMiniCluster(getStreamGraph().getJobGraph(), getParallelism(), getConfig().isSysoutLoggingEnabled());
+		transformations.clear();
 		return result;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 75e15d7..2f8938f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -84,15 +84,15 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 	@Override
 	public JobExecutionResult execute() throws ProgramInvocationException {
-
-		JobGraph jobGraph = streamGraph.getJobGraph();
+		JobGraph jobGraph = getStreamGraph().getJobGraph();
+		transformations.clear();
 		return executeRemotely(jobGraph);
 	}
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
-
-		JobGraph jobGraph = streamGraph.getJobGraph(jobName);
+		JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
+		transformations.clear();
 		return executeRemotely(jobGraph);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 9bfeb2f..c2335d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -63,15 +63,16 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		currentEnvironment = null;
 
 		JobGraph jobGraph;
 		if (jobName == null) {
-			jobGraph = this.streamGraph.getJobGraph();
+			jobGraph = this.getStreamGraph().getJobGraph();
 		} else {
-			jobGraph = this.streamGraph.getJobGraph(jobName);
+			jobGraph = this.getStreamGraph().getJobGraph(jobName);
 		}
 
+		transformations.clear();
+
 		for (File file : jars) {
 			jobGraph.addJar(new Path(file.getAbsolutePath()));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index fba4e28..a019a31 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.environment;
 
 import com.esotericsoftware.kryo.Serializer;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -60,8 +61,9 @@ import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.SplittableIterator;
 
@@ -86,22 +88,26 @@ public abstract class StreamExecutionEnvironment {
 
 	private ExecutionConfig config = new ExecutionConfig();
 
-	protected static StreamExecutionEnvironment currentEnvironment;
+	protected List<StreamTransformation<?>> transformations = Lists.newArrayList();
 
-	protected StreamGraph streamGraph;
+	protected boolean isChainingEnabled = true;
+
+	protected long checkpointInterval = -1; // disabled
+
+	protected CheckpointingMode checkpointingMode = null;
+
+	protected boolean forceCheckpointing = false;
+
+	protected StateHandleProvider<?> stateHandleProvider;
+
+	/** The environment of the context (local by default, cluster if invoked through command line) */
+	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
 
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Constructor for creating StreamExecutionEnvironment
-	 */
-	protected StreamExecutionEnvironment() {
-		streamGraph = new StreamGraph(this);
-	}
-
-	/**
 	 * Gets the config object.
 	 */
 	public ExecutionConfig getConfig() {
@@ -221,10 +227,19 @@ public abstract class StreamExecutionEnvironment {
 	 * @return StreamExecutionEnvironment with chaining disabled.
 	 */
 	public StreamExecutionEnvironment disableOperatorChaining() {
-		streamGraph.setChaining(false);
+		this.isChainingEnabled = false;
 		return this;
 	}
 
+	/**
+	 * Returns whether operator chaining is enabled.
+	 *
+	 * @return {@code true} if chaining is enabled, false otherwise.
+	 */
+	public boolean isChainingEnabled() {
+		return isChainingEnabled;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Checkpointing Settings
 	// ------------------------------------------------------------------------
@@ -275,10 +290,9 @@ public abstract class StreamExecutionEnvironment {
 		if (interval <= 0) {
 			throw new IllegalArgumentException("the checkpoint interval must be positive");
 		}
-		
-		streamGraph.setCheckpointingEnabled(true);
-		streamGraph.setCheckpointingInterval(interval);
-		streamGraph.setCheckpointingMode(mode);
+
+		this.checkpointInterval = interval;
+		this.checkpointingMode = mode;
 		return this;
 	}
 	
@@ -303,19 +317,9 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	@Deprecated
 	public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
-		if (mode == null) {
-			throw new NullPointerException("checkpoint mode must not be null");
-		}
-		if (interval <= 0) {
-			throw new IllegalArgumentException("the checkpoint interval must be positive");
-		}
-		
-		streamGraph.setCheckpointingEnabled(true);
-		streamGraph.setCheckpointingInterval(interval);
-		streamGraph.setCheckpointingMode(mode);
-		if (force) {
-			streamGraph.forceCheckpoint();
-		}
+		this.enableCheckpointing(interval, mode);
+
+		this.forceCheckpointing = force;
 		return this;
 	}
 
@@ -334,12 +338,35 @@ public abstract class StreamExecutionEnvironment {
 	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
 	 */
 	public StreamExecutionEnvironment enableCheckpointing() {
-		streamGraph.setCheckpointingEnabled(true);
-		streamGraph.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+		enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
 		return this;
 	}
 
 	/**
+	 * Returns the checkpointing interval or -1 if checkpointing is disabled.
+	 *
+	 * @return The checkpointing interval or -1
+	 */
+	public long getCheckpointInterval() {
+		return checkpointInterval;
+	}
+
+
+	/**
+	 * Returns whether checkpointing is force-enabled.
+	 */
+	public boolean isForceCheckpointing() {
+		return forceCheckpointing;
+	}
+
+	/**
+	 * Returns the {@link CheckpointingMode}.
+	 */
+	public CheckpointingMode getCheckpointingMode() {
+		return checkpointingMode;
+	}
+
+	/**
 	 * Sets the {@link StateHandleProvider} used for storing operator state
 	 * checkpoints when checkpointing is enabled.
 	 * <p>
@@ -348,11 +375,22 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 */
 	public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider<?> provider) {
-		streamGraph.setStateHandleProvider(provider);
+		this.stateHandleProvider = provider;
 		return this;
 	}
 
 	/**
+	 * Returns the {@link org.apache.flink.runtime.state.StateHandle}
+	 *
+	 * @see #setStateHandleProvider(org.apache.flink.runtime.state.StateHandleProvider)
+	 *
+	 * @return The StateHandleProvider
+	 */
+	public StateHandleProvider<?> getStateHandleProvider() {
+		return stateHandleProvider;
+	}
+
+	/**
 	 * Sets the number of times that failed tasks are re-executed. A value of
 	 * zero effectively disables fault tolerance. A value of {@code -1}
 	 * indicates that the system default value (as defined in the configuration)
@@ -591,7 +629,7 @@ public abstract class StreamExecutionEnvironment {
 		
 		// must not have null elements and mixed elements
 		FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
-		
+
 		SourceFunction<OUT> function;
 		try {
 			function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
@@ -599,7 +637,7 @@ public abstract class StreamExecutionEnvironment {
 		catch (IOException e) {
 			throw new RuntimeException(e.getMessage(), e);
 		}
-		return addSource(function, "Collection Source", typeInfo);
+		return addSource(function, "Collection Source", typeInfo).setParallelism(1);
 	}
 
 	/**
@@ -980,15 +1018,12 @@ public abstract class StreamExecutionEnvironment {
 	private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
 			TypeInformation<OUT> typeInfo, String sourceName) {
 		FileSourceFunction<OUT> function = new FileSourceFunction<OUT>(inputFormat, typeInfo);
-		DataStreamSource<OUT> returnStream = addSource(function, sourceName).returns(typeInfo);
-		streamGraph.setInputFormat(returnStream.getId(), inputFormat);
-		return returnStream;
+		return addSource(function, sourceName).returns(typeInfo);
 	}
 
 	/**
-	 * Adds a data source with a custom type information thus opening a
-	 * {@link org.apache.flink.streaming.api.datastream.DataStream}. Only in very special cases does the user need
-	 * to support type information. Otherwise use {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+	 * Adds a Data Source to the streaming topology.
+	 *
 	 * <p>
 	 * By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
 	 * implement {@link org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or extend {@link
@@ -1078,10 +1113,9 @@ public abstract class StreamExecutionEnvironment {
 		boolean isParallel = function instanceof ParallelSourceFunction;
 
 		clean(function);
-		StreamOperator<OUT> sourceOperator = new StreamSource<OUT>(function);
+		StreamSource<OUT> sourceOperator = new StreamSource<OUT>(function);
 
-		return new DataStreamSource<OUT>(this, sourceName, typeInfo, sourceOperator,
-				isParallel, sourceName);
+		return new DataStreamSource<OUT>(this, typeInfo, sourceOperator, isParallel, sourceName);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1098,20 +1132,20 @@ public abstract class StreamExecutionEnvironment {
 	 * executed.
 	 */
 	public static StreamExecutionEnvironment getExecutionEnvironment() {
-		if (currentEnvironment != null) {
-			return currentEnvironment;
+		if (contextEnvironmentFactory != null) {
+			return contextEnvironmentFactory.createExecutionEnvironment();
 		}
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		if (env instanceof ContextEnvironment) {
 			ContextEnvironment ctx = (ContextEnvironment) env;
-			currentEnvironment = createContextEnvironment(ctx.getClient(), ctx.getJars(),
+			return createContextEnvironment(ctx.getClient(), ctx.getJars(),
 					ctx.getParallelism(), ctx.isWait());
 		} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
-			currentEnvironment = new StreamPlanEnvironment(env);
+			return new StreamPlanEnvironment(env);
 		} else {
 			return createLocalEnvironment();
 		}
-		return currentEnvironment;
 	}
 
 	private static StreamExecutionEnvironment createContextEnvironment(Client client,
@@ -1143,9 +1177,9 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A local execution environment with the specified parallelism.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
-		currentEnvironment = new LocalStreamEnvironment();
-		currentEnvironment.setParallelism(parallelism);
-		return (LocalStreamEnvironment) currentEnvironment;
+		LocalStreamEnvironment env = new LocalStreamEnvironment();
+		env.setParallelism(parallelism);
+		return env;
 	}
 
 	// TODO:fix cluster default parallelism
@@ -1172,8 +1206,8 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
 			String... jarFiles) {
-		currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles);
-		return currentEnvironment;
+		RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
+		return env;
 	}
 
 	/**
@@ -1199,9 +1233,9 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
 			int parallelism, String... jarFiles) {
-		currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles);
-		currentEnvironment.setParallelism(parallelism);
-		return currentEnvironment;
+		RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
+		env.setParallelism(parallelism);
+		return env;
 	}
 
 	/**
@@ -1239,7 +1273,11 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The streamgraph representing the transformations
 	 */
 	public StreamGraph getStreamGraph() {
-		return streamGraph;
+		if (transformations.size() <= 0) {
+			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
+		}
+		StreamGraph result = StreamGraphGenerator.generate(this, transformations);
+		return result;
 	}
 
 	/**
@@ -1254,10 +1292,6 @@ public abstract class StreamExecutionEnvironment {
 		return getStreamGraph().getStreamingPlanAsJSON();
 	}
 
-	protected static void initializeFromFactory(StreamExecutionEnvironmentFactory eef) {
-		currentEnvironment = eef.createExecutionEnvironment();
-	}
-
 	/**
 	 * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
 	 * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
@@ -1270,4 +1304,28 @@ public abstract class StreamExecutionEnvironment {
 		return f;
 	}
 
+	/**
+	 * Adds an operator to the list of operators that should be executed when calling
+	 * {@link #execute}.
+	 *
+	 * <p>
+	 * When calling {@link #execute()} only the operators that where previously added to the list
+	 * are executed.
+	 *
+	 * <p>
+	 * This is not meant to be used by users. The API methods that create operators must call
+	 * this method.
+	 */
+	public void addOperator(StreamTransformation<?> transformation) {
+		Preconditions.checkNotNull(transformation, "Sinks must not be null.");
+		this.transformations.add(transformation);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Methods to control the context and local environments for execution from packaged programs
+	// --------------------------------------------------------------------------------------------
+
+	protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
+		contextEnvironmentFactory = ctx;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 02fccd0..8c1408e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 
 public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 
@@ -55,10 +56,12 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		currentEnvironment = null;
 
+		StreamGraph streamGraph = getStreamGraph();
 		streamGraph.setJobName(jobName);
 
+		transformations.clear();
+
 		if (env instanceof OptimizerPlanEnvironment) {
 			((OptimizerPlanEnvironment) env).setPlan(streamGraph);
 		} else if (env instanceof PreviewPlanEnvironment) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index cf08e5a..253c076 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -134,4 +134,12 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 		isRunning = false;
 	}
 
+
+	/**
+	 * Returns the {@code InputFormat}. This is only needed because we need to set the input
+	 * split assigner on the {@code StreamGraph}.
+	 */
+	public InputFormat<OUT, InputSplit> getFormat() {
+		return format;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index bc20fff..743ee4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +52,19 @@ public class JSONGenerator {
 		JSONArray nodes = new JSONArray();
 		json.put("nodes", nodes);
 		List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
-		Collections.sort(operatorIDs);
+		Collections.sort(operatorIDs, new Comparator<Integer>() {
+			@Override
+			public int compare(Integer o1, Integer o2) {
+				// put sinks at the back
+				if (streamGraph.getSinkIDs().contains(o1)) {
+					return 1;
+				} else if (streamGraph.getSinkIDs().contains(o2)) {
+					return -1;
+				} else {
+					return o1 - o2;
+				}
+			}
+		});
 		visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
 		return json.toString();
 	}
@@ -87,7 +100,7 @@ public class JSONGenerator {
 			for (StreamEdge inEdge : vertex.getInEdges()) {
 				int operator = inEdge.getSourceId();
 
-				if (streamGraph.vertexIDtoLoop.containsKey(operator)) {
+				if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
 					iterationHead = operator;
 				}
 			}
@@ -119,7 +132,7 @@ public class JSONGenerator {
 		toVisit.remove(vertexID);
 
 		// Ignoring head and tail to avoid redundancy
-		if (!streamGraph.vertexIDtoLoop.containsKey(vertexID)) {
+		if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
 			JSONObject obj = new JSONObject();
 			jsonArray.put(obj);
 			decorateNode(vertexID, obj);
@@ -131,7 +144,7 @@ public class JSONGenerator {
 
 				if (edgeRemapings.keySet().contains(inputID)) {
 					decorateEdge(inEdges, vertexID, inputID, inputID);
-				} else if (!streamGraph.vertexIDtoLoop.containsKey(inputID)) {
+				} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
 					decorateEdge(iterationInEdges, vertexID, inputID, inputID);
 				}
 			}
@@ -147,8 +160,7 @@ public class JSONGenerator {
 		JSONObject input = new JSONObject();
 		inputArray.put(input);
 		input.put(ID, mappedInputID);
-		input.put(SHIP_STRATEGY, streamGraph.getStreamEdge(inputID, vertexID).getPartitioner()
-				.getStrategy());
+		input.put(SHIP_STRATEGY, streamGraph.getStreamEdge(inputID, vertexID).getPartitioner());
 		input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
 	}
 
@@ -161,8 +173,10 @@ public class JSONGenerator {
 
 		if (streamGraph.getSourceIDs().contains(vertexID)) {
 			node.put(PACT, "Data Source");
+		} else if (streamGraph.getSinkIDs().contains(vertexID)) {
+			node.put(PACT, "Data Sink");
 		} else {
-			node.put(PACT, "Data Stream");
+			node.put(PACT, "Operator");
 		}
 
 		StreamOperator<?> operator = streamGraph.getStreamNode(vertexID).getOperator();

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 4f19db6..2c422d9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -257,29 +257,6 @@ public class StreamConfig implements Serializable {
 		return config.getLong(ITERATON_WAIT, 0);
 	}
 
-	public void setSelectedNames(Integer output, List<String> selected) {
-		if (selected == null) {
-			selected = new ArrayList<String>();
-		}
-
-		try {
-			InstantiationUtil.writeObjectToConfig(selected, this.config, OUTPUT_NAME + output);
-		} catch (IOException e) {
-			throw new StreamTaskException("Cannot serialize OutputSelector for name \"" + output+ "\".", e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<String> getSelectedNames(Integer output, ClassLoader cl) {
-		List<String> selectedNames;
-		try {
-			selectedNames = (List<String>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_NAME + output, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Cannot deserialize OutputSelector for name \"" + output + "\".", e);
-		}
-		return selectedNames == null ? new ArrayList<String>() : selectedNames;
-	}
-
 	public void setNumberOfInputs(int numberOfInputs) {
 		config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
 	}


Mime
View raw message