flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [4/8] flink git commit: [FLINK-1345] [streaming] Advanced task chaining added
Date Wed, 21 Jan 2015 17:53:58 GMT
[FLINK-1345] [streaming] Advanced task chaining added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7dbb55ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7dbb55ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7dbb55ec

Branch: refs/heads/master
Commit: 7dbb55ece0a9d9777c0e3254bc8f9f5cf566d535
Parents: 3e30c6f
Author: Gyula Fora <gyfora@apache.org>
Authored: Sun Jan 18 18:23:57 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Jan 21 16:06:34 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 298 ++++++++-----------
 .../flink/streaming/api/StreamConfig.java       | 180 +++++++----
 .../api/collector/CollectorWrapper.java         |   2 +-
 .../streaming/api/collector/StreamOutput.java   |  15 +
 .../api/collector/StreamOutputWrapper.java      |   6 -
 .../api/streamvertex/OutputHandler.java         | 204 ++++++++-----
 .../api/streamvertex/StreamIterationHead.java   |  23 +-
 .../flink/streaming/api/scala/DataStream.scala  |  10 +
 8 files changed, 433 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 0020d48..6ae97c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -24,9 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
@@ -85,11 +85,11 @@ public class JobGraphBuilder {
 	private Map<String, Long> iterationWaitTime;
 	private Map<String, Map<String, OperatorState<?>>> operatorStates;
 	private Map<String, InputFormat<String, ?>> inputFormatList;
-	private Map<String, List<String>> chainedVertices;
-	private Map<String, String> lastInChains;
+	private Map<String, Map<String, StreamConfig>> chainedConfigs;
+	private Map<String, StreamConfig> vertexConfigs;
 
 	private Set<String> sources;
-	private Set<String> builtVertices;
+	private Set<String> builtNodes;
 
 	/**
 	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -129,11 +129,11 @@ public class JobGraphBuilder {
 		iterationWaitTime = new HashMap<String, Long>();
 		operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
 		inputFormatList = new HashMap<String, InputFormat<String, ?>>();
-		chainedVertices = new HashMap<String, List<String>>();
-		lastInChains = new HashMap<String, String>();
+		chainedConfigs = new HashMap<String, Map<String, StreamConfig>>();
+		vertexConfigs = new HashMap<String, StreamConfig>();
 
 		sources = new HashSet<String>();
-		builtVertices = new HashSet<String>();
+		builtNodes = new HashSet<String>();
 	}
 
 	/**
@@ -198,6 +198,8 @@ public class JobGraphBuilder {
 
 		addVertex(vertexName, StreamIterationHead.class, null, null, parallelism);
 
+		chaining = false;
+
 		iterationIds.put(vertexName, iterationID);
 		iterationIDtoHeadName.put(iterationID, vertexName);
 
@@ -297,7 +299,6 @@ public class JobGraphBuilder {
 		inEdgeList.put(vertexName, new ArrayList<String>());
 		outPartitioning.put(vertexName, new ArrayList<StreamPartitioner<?>>());
 		iterationTailCount.put(vertexName, 0);
-		lastInChains.put(vertexName, vertexName);
 	}
 
 	private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1,
@@ -309,170 +310,175 @@ public class JobGraphBuilder {
 		typeSerializersOut2.put(vertexName, out2);
 	}
 
-	/**
-	 * Creates an {@link AbstractJobVertex} in the {@link JobGraph} and sets its
-	 * config parameters using the ones set previously.
-	 * 
-	 * @param vertexName
-	 *            Name for which the vertex will be created.
-	 */
-	private void createVertex(String vertexName) {
+	private List<Tuple2<String, String>> createChain(String startNode, String current) {
 
-		if (!builtVertices.contains(vertexName)) {
-			if (!outEdgeList.get(vertexName).isEmpty()) {
+		if (!builtNodes.contains(startNode)) {
 
-				for (String outName : outEdgeList.get(vertexName)) {
-					if (isChainable(vertexName, outName)) {
-						chainRecursively(vertexName, vertexName, outName);
-					} else {
-						createVertex(outName);
-					}
+			List<Tuple2<String, String>> transitiveOutEdges = new ArrayList<Tuple2<String, String>>();
+			List<String> chainableOutputs = new ArrayList<String>();
+			List<String> nonChainableOutputs = new ArrayList<String>();
 
+			for (String outName : outEdgeList.get(current)) {
+				if (isChainable(current, outName)) {
+					chainableOutputs.add(outName);
+				} else {
+					nonChainableOutputs.add(outName);
 				}
+
 			}
 
-			List<String> chainedNames = chainedVertices.get(vertexName);
-			boolean isChained = chainedNames != null;
-			int numChained = isChained ? chainedNames.size() : 0;
-			String lastInChain = lastInChains.get(vertexName);
-
-			// Get vertex attributes
-			Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName);
-			StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName);
-			int parallelism = vertexParallelism.get(vertexName);
-			byte[] outputSelector = outputSelectors.get(lastInChain);
-			Map<String, OperatorState<?>> state = operatorStates.get(vertexName);
-
-			// Create vertex object
-			String cname = chainedVertices.get(vertexName) == null ? "" : " => "
-					+ StringUtils.join(chainedVertices.get(vertexName), " => ");
-			AbstractJobVertex vertex = new AbstractJobVertex(vertexName + cname);
-
-			this.jobGraph.addVertex(vertex);
-
-			vertex.setInvokableClass(vertexClass);
-			vertex.setParallelism(parallelism);
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Parallelism set: {} for {}", parallelism, vertexName);
+			for (String chainable : chainableOutputs) {
+				transitiveOutEdges.addAll(createChain(startNode, chainable));
 			}
 
-			// Set vertex config
+			for (String nonChainable : nonChainableOutputs) {
+				transitiveOutEdges.add(new Tuple2<String, String>(current, nonChainable));
+				transitiveOutEdges.addAll(createChain(nonChainable, nonChainable));
+			}
 
-			StreamConfig config = new StreamConfig(vertex.getConfiguration());
+			StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode)
+					: new StreamConfig(new Configuration());
 
-			config.setBufferTimeout(bufferTimeout.get(lastInChain));
+			setVertexConfig(current, config, chainableOutputs, nonChainableOutputs);
 
-			config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
-			config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
-			config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
-			config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
+			if (current.equals(startNode)) {
 
-			config.setUserInvokable(invokableObject);
-			config.setOutputSelector(outputSelector);
-			config.setOperatorStates(state);
+				config.setChainStart();
+				config.setRecordWriterOrder(transitiveOutEdges);
 
-			if (vertexClass.equals(StreamIterationHead.class)
-					|| vertexClass.equals(StreamIterationTail.class)) {
-				config.setIterationId(iterationIds.get(vertexName));
-				config.setIterationWaitTime(iterationWaitTime.get(vertexName));
-			}
+				for (Tuple2<String, String> edge : transitiveOutEdges) {
+					connect(startNode, edge);
+				}
 
-			if (inputFormatList.containsKey(vertexName)) {
-				vertex.setInputSplitSource(inputFormatList.get(vertexName));
-			}
+				vertexConfigs.get(startNode).setTransitiveChainedTaskConfigs(
+						chainedConfigs.get(startNode));
+
+			} else {
 
-			config.setNumberofChainedTasks(numChained);
+				Map<String, StreamConfig> chainedConfs = chainedConfigs.get(startNode);
 
-			for (int i = 0; i < numChained; i++) {
-				config.setChainedInvokable(
-						(ChainableInvokable<?, ?>) invokableObjects.get(chainedNames.get(i)), i);
-				config.setChainedSerializer(typeSerializersIn1.get(chainedNames.get(i)), i);
+				if (chainedConfs == null) {
+					chainedConfigs.put(startNode, new HashMap<String, StreamConfig>());
+				}
+				chainedConfigs.get(startNode).put(current, config);
 			}
 
-			streamVertices.put(vertexName, vertex);
-			builtVertices.add(vertexName);
+			return transitiveOutEdges;
 
+		} else {
+			return new ArrayList<Tuple2<String, String>>();
 		}
+	}
+
+	private StreamConfig createProcessingVertex(String vertexName) {
+
+		AbstractJobVertex vertex = new AbstractJobVertex(vertexName);
+
+		this.jobGraph.addVertex(vertex);
 
+		int parallelism = vertexParallelism.get(vertexName);
+
+		vertex.setInvokableClass(vertexClasses.get(vertexName));
+		vertex.setParallelism(parallelism);
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Parallelism set: {} for {}", parallelism, vertexName);
+		}
+
+		if (inputFormatList.containsKey(vertexName)) {
+			vertex.setInputSplitSource(inputFormatList.get(vertexName));
+		}
+
+		streamVertices.put(vertexName, vertex);
+		builtNodes.add(vertexName);
+
+		return new StreamConfig(vertex.getConfiguration());
 	}
 
-	private void chainRecursively(String chainStart, String current, String next) {
-		// We chain the next operator to the start of this chain
-		chainTasks(chainStart, next);
-		// Now recursively chain the outputs of next (depth first)
-		for (String output : outEdgeList.get(next)) {
-			if (isChainable(next, output)) {
-				// Recursive call
-				chainRecursively(chainStart, next, output);
-			} else {
-				// If not chainable we continue building the jobgraph from there
-				createVertex(output);
-			}
+	private void setVertexConfig(String vertexName, StreamConfig config,
+			List<String> chainableOutputs, List<String> nonChainableOutputs) {
+
+		StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName);
+		byte[] outputSelector = outputSelectors.get(vertexName);
+		Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName);
+		Map<String, OperatorState<?>> state = operatorStates.get(vertexName);
+
+		config.setVertexName(vertexName);
+
+		config.setBufferTimeout(bufferTimeout.get(vertexName));
+
+		config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
+		config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
+		config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
+		config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
+
+		config.setUserInvokable(invokableObject);
+		config.setOutputSelector(outputSelector);
+		config.setOperatorStates(state);
+
+		config.setNumberOfOutputs(nonChainableOutputs.size());
+		config.setOutputs(nonChainableOutputs);
+		config.setChainedOutputs(chainableOutputs);
+
+		if (vertexClass.equals(StreamIterationHead.class)
+				|| vertexClass.equals(StreamIterationTail.class)) {
+			config.setIterationId(iterationIds.get(vertexName));
+			config.setIterationWaitTime(iterationWaitTime.get(vertexName));
 		}
+
+		vertexConfigs.put(vertexName, config);
 	}
 
 	private boolean isChainable(String vertexName, String outName) {
-		return outEdgeList.get(vertexName).size() == 1
-				&& inEdgeList.get(outName).size() == 1
+		return inEdgeList.get(outName).size() == 1
+				&& invokableObjects.get(outName) != null
 				&& outputSelectors.get(vertexName) == null
 				&& invokableObjects.get(outName).getChainingStrategy() == ChainingStrategy.ALWAYS
 				&& (invokableObjects.get(vertexName).getChainingStrategy() == ChainingStrategy.HEAD || invokableObjects
 						.get(vertexName).getChainingStrategy() == ChainingStrategy.ALWAYS)
-				&& outPartitioning.get(vertexName).get(0).getStrategy() == PartitioningStrategy.FORWARD
+				&& outPartitioning.get(vertexName)
+						.get(outEdgeList.get(vertexName).indexOf(outName)).getStrategy() == PartitioningStrategy.FORWARD
 				&& vertexParallelism.get(vertexName) == vertexParallelism.get(outName) && chaining;
 	}
 
-	private void chainTasks(String first, String second) {
+	private <T> void connect(String headOfChain, Tuple2<String, String> edge) {
 
-		List<String> chained = chainedVertices.get(first);
-		if (chained == null) {
-			chained = new ArrayList<String>();
-		}
-		chained.add(second);
-		chainedVertices.put(first, chained);
-		lastInChains.put(first, second);
-
-	}
+		String upStreamVertexName = edge.f0;
+		String downStreamVertexName = edge.f1;
 
-	/**
-	 * Connects two vertices with the given names, partitioning and channel type
-	 * 
-	 * @param upStreamVertexName
-	 *            Name of the upstream vertex, that will emit the values
-	 * @param downStreamVertexName
-	 *            Name of the downstream vertex, that will receive the values
-	 * @param partitionerObject
-	 *            The partitioner
-	 */
-	private <T> void connect(String upStreamVertexName, String downStreamVertexName,
-			StreamPartitioner<T> partitionerObject) {
+		int outputIndex = outEdgeList.get(upStreamVertexName).indexOf(downStreamVertexName);
 
-		AbstractJobVertex upStreamVertex = streamVertices.get(upStreamVertexName);
+		AbstractJobVertex headVertex = streamVertices.get(headOfChain);
 		AbstractJobVertex downStreamVertex = streamVertices.get(downStreamVertexName);
 
-		StreamConfig config = new StreamConfig(upStreamVertex.getConfiguration());
+		StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
+		StreamConfig upStreamConfig = new StreamConfig(headVertex.getConfiguration());
+
+		List<Integer> outEdgeIndexList = outEdgeIndex.get(upStreamVertexName);
+		int numOfInputs = downStreamConfig.getNumberOfInputs();
+
+		downStreamConfig.setInputIndex(numOfInputs++, outEdgeIndexList.get(outputIndex));
+		downStreamConfig.setNumberOfInputs(numOfInputs);
+
+		StreamPartitioner<?> partitionerObject = outPartitioning.get(upStreamVertexName).get(
+				outputIndex);
+
+		upStreamConfig.setPartitioner(downStreamVertexName, partitionerObject);
 
 		if (partitionerObject.getStrategy() == PartitioningStrategy.FORWARD) {
-			downStreamVertex
-					.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE);
+			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
 		} else {
-			downStreamVertex.connectNewDataSetAsInput(upStreamVertex,
-					DistributionPattern.ALL_TO_ALL);
+			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
 		}
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(),
-					upStreamVertexName, downStreamVertexName);
+					headOfChain, downStreamVertexName);
 		}
 
-		int outputIndex = upStreamVertex.getNumberOfProducedIntermediateDataSets() - 1;
-
-		config.setOutputName(outputIndex, outEdgeNames.get(lastInChains.get(upStreamVertexName))
+		upStreamConfig.setOutputNames(downStreamVertexName, outEdgeNames.get(upStreamVertexName)
 				.get(outputIndex));
-		config.setSelectAll(outputIndex, outEdgeSelectAll.get(lastInChains.get(upStreamVertexName))
+		upStreamConfig.setSelectAll(downStreamVertexName, outEdgeSelectAll.get(upStreamVertexName)
 				.get(outputIndex));
-		config.setPartitioner(outputIndex, partitionerObject);
-		config.setNumberOfOutputChannels(outputIndex, vertexParallelism.get(downStreamVertexName));
 	}
 
 	/**
@@ -630,27 +636,6 @@ public class JobGraphBuilder {
 	}
 
 	/**
-	 * Writes number of inputs into each JobVertex's config
-	 */
-	private void setNumberOfJobInputs() {
-		for (AbstractJobVertex vertex : streamVertices.values()) {
-			(new StreamConfig(vertex.getConfiguration())).setNumberOfInputs(vertex
-					.getNumberOfInputs());
-		}
-	}
-
-	/**
-	 * Writes the number of outputs and output channels into each JobVertex's
-	 * config
-	 */
-	private void setNumberOfJobOutputs() {
-		for (AbstractJobVertex vertex : streamVertices.values()) {
-			(new StreamConfig(vertex.getConfiguration())).setNumberOfOutputs(vertex
-					.getNumberOfProducedIntermediateDataSets());
-		}
-	}
-
-	/**
 	 * Gets the assembled {@link JobGraph} and adds a default name for it.
 	 */
 	public JobGraph getJobGraph() {
@@ -677,33 +662,10 @@ public class JobGraphBuilder {
 	private void buildJobGraph() {
 
 		for (String sourceName : sources) {
-			createVertex(sourceName);
-		}
-
-		for (String upStreamVertexName : builtVertices) {
-			int i = 0;
-
-			List<Integer> outEdgeTypeList = outEdgeIndex.get(lastInChains.get(upStreamVertexName));
-
-			for (String downStreamVertexName : outEdgeList
-					.get(lastInChains.get(upStreamVertexName))) {
-				StreamConfig downStreamVertexConfig = new StreamConfig(streamVertices.get(
-						downStreamVertexName).getConfiguration());
-
-				int inputNumber = downStreamVertexConfig.getNumberOfInputs();
-
-				downStreamVertexConfig.setInputIndex(inputNumber++, outEdgeTypeList.get(i));
-				downStreamVertexConfig.setNumberOfInputs(inputNumber);
-
-				connect(upStreamVertexName, downStreamVertexName,
-						outPartitioning.get(lastInChains.get(upStreamVertexName)).get(i));
-				i++;
-			}
+			createChain(sourceName, sourceName);
 		}
 
 		setSlotSharing();
-		setNumberOfJobInputs();
-		setNumberOfJobOutputs();
 	}
 
 	public void setChaining(boolean chaining) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index ada3aae..6fffaa6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.streaming.api;
 
-import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
@@ -35,17 +35,20 @@ import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.util.InstantiationUtil;
 
-public class StreamConfig {
+public class StreamConfig implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
 	private static final String INPUT_TYPE = "inputType_";
 	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
 	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
-	private static final String NUMBER_OF_CHAINED_TASKS = "numOfChained";
-	private static final String CHAINED_IN_SERIALIZER = "chainedSerializer_";
-	private static final String CHAINED_INVOKABLE = "chainedInvokable_";
+	private static final String CHAINED_OUTPUTS = "chainedOutputs";
+	private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
+	private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
 	private static final String OUTPUT_NAME = "outputName_";
 	private static final String OUTPUT_SELECT_ALL = "outputSelectAll_";
 	private static final String PARTITIONER_OBJECT = "partitionerObject_";
-	private static final String NUMBER_OF_OUTPUT_CHANNELS = "numOfOutputs_";
+	private static final String VERTEX_NAME = "vertexName";
 	private static final String ITERATION_ID = "iteration-id";
 	private static final String OUTPUT_SELECTOR = "outputSelector";
 	private static final String DIRECTED_EMIT = "directedEmit";
@@ -58,6 +61,8 @@ public class StreamConfig {
 	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
 	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
 	private static final String ITERATON_WAIT = "iterationWait";
+	private static final String OUTPUTS = "outVertexNames";
+	private static final String RW_ORDER = "rwOrder";
 
 	// DEFAULT VALUES
 
@@ -75,6 +80,14 @@ public class StreamConfig {
 		return config;
 	}
 
+	public void setVertexName(String vertexName) {
+		config.setString(VERTEX_NAME, vertexName);
+	}
+
+	public String getTaskName() {
+		return config.getString(VERTEX_NAME, "Missing");
+	}
+
 	public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
 	}
@@ -206,25 +219,21 @@ public class StreamConfig {
 		return config.getLong(ITERATON_WAIT, 0);
 	}
 
-	public void setNumberOfOutputChannels(int outputIndex, Integer numberOfOutputChannels) {
-		config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, numberOfOutputChannels);
-	}
+	public <T> void setPartitioner(String output, StreamPartitioner<T> partitionerObject) {
 
-	public int getNumberOfOutputChannels(int outputIndex) {
-		return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0);
-	}
-
-	public <T> void setPartitioner(int outputIndex, StreamPartitioner<T> partitionerObject) {
-
-		config.setBytes(PARTITIONER_OBJECT + outputIndex,
+		config.setBytes(PARTITIONER_OBJECT + output,
 				SerializationUtils.serialize(partitionerObject));
 	}
 
-	public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, int outputIndex)
-			throws ClassNotFoundException, IOException {
-		@SuppressWarnings("unchecked")
-		StreamPartitioner<T> partitioner = (StreamPartitioner<T>) InstantiationUtil
-				.readObjectFromConfig(this.config, PARTITIONER_OBJECT + outputIndex, cl);
+	@SuppressWarnings("unchecked")
+	public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, String output) {
+		StreamPartitioner<T> partitioner = null;
+		try {
+			partitioner = (StreamPartitioner<T>) InstantiationUtil.readObjectFromConfig(
+					this.config, PARTITIONER_OBJECT + output, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Partitioner could not be instantiated.");
+		}
 		if (partitioner != null) {
 			return partitioner;
 		} else {
@@ -232,27 +241,27 @@ public class StreamConfig {
 		}
 	}
 
-	public void setSelectAll(int outputIndex, Boolean selectAll) {
+	public void setSelectAll(String output, Boolean selectAll) {
 		if (selectAll != null) {
-			config.setBoolean(OUTPUT_SELECT_ALL + outputIndex, selectAll);
+			config.setBoolean(OUTPUT_SELECT_ALL + output, selectAll);
 		}
 	}
 
-	public boolean isSelectAll(int outputIndex) {
-		return config.getBoolean(OUTPUT_SELECT_ALL + outputIndex, false);
+	public boolean isSelectAll(String output) {
+		return config.getBoolean(OUTPUT_SELECT_ALL + output, true);
 	}
 
-	public void setOutputName(int outputIndex, List<String> outputName) {
+	public void setOutputNames(String output, List<String> outputName) {
 		if (outputName != null) {
-			config.setBytes(OUTPUT_NAME + outputIndex,
+			config.setBytes(OUTPUT_NAME + output,
 					SerializationUtils.serialize((Serializable) outputName));
 		}
 	}
 
 	@SuppressWarnings("unchecked")
-	public List<String> getOutputNames(int outputIndex) {
-		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME
-				+ outputIndex, null));
+	public List<String> getOutputNames(String output) {
+		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
+				null));
 	}
 
 	public void setNumberOfInputs(int numberOfInputs) {
@@ -271,6 +280,38 @@ public class StreamConfig {
 		return config.getInteger(NUMBER_OF_OUTPUTS, 0);
 	}
 
+	public void setOutputs(List<String> outputVertexNames) {
+		config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputVertexNames));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<String> getOutputs(ClassLoader cl) {
+		try {
+			return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate outputs.");
+		}
+	}
+
+	public void setRecordWriterOrder(List<Tuple2<String, String>> outEdgeList) {
+
+		List<String> outVertices = new ArrayList<String>();
+		for (Tuple2<String, String> edge : outEdgeList) {
+			outVertices.add(edge.f1);
+		}
+
+		config.setBytes(RW_ORDER, SerializationUtils.serialize((Serializable) outVertices));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<String> getRecordWriterOrder(ClassLoader cl) {
+		try {
+			return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, RW_ORDER, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate outputs.");
+		}
+	}
+
 	public void setInputIndex(int inputNumber, Integer inputTypeNumber) {
 		config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber);
 	}
@@ -293,40 +334,77 @@ public class StreamConfig {
 		}
 	}
 
-	public int getNumberofChainedTasks() {
-		return config.getInteger(NUMBER_OF_CHAINED_TASKS, 0);
+	public void setChainedOutputs(List<String> chainedOutputs) {
+		config.setBytes(CHAINED_OUTPUTS,
+				SerializationUtils.serialize((Serializable) chainedOutputs));
 	}
 
-	public void setNumberofChainedTasks(int n) {
-		config.setInteger(NUMBER_OF_CHAINED_TASKS, n);
-	}
-
-	public ChainableInvokable<?, ?> getChainedInvokable(int chainedTaskIndex, ClassLoader cl) {
+	@SuppressWarnings("unchecked")
+	public List<String> getChainedOutputs(ClassLoader cl) {
 		try {
-			return (ChainableInvokable<?, ?>) InstantiationUtil.readObjectFromConfig(this.config,
-					CHAINED_INVOKABLE + chainedTaskIndex, cl);
+			return (List<String>) InstantiationUtil.readObjectFromConfig(this.config,
+					CHAINED_OUTPUTS, cl);
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate invokable.");
+			throw new RuntimeException("Could not instantiate chained outputs.");
 		}
 	}
 
-	public StreamRecordSerializer<?> getChainedInSerializer(int chainedTaskIndex, ClassLoader cl) {
+	public void setTransitiveChainedTaskConfigs(Map<String, StreamConfig> chainedTaskConfigs) {
+		config.setBytes(CHAINED_TASK_CONFIG,
+				SerializationUtils.serialize((Serializable) chainedTaskConfigs));
+	}
+
+	@SuppressWarnings("unchecked")
+	public Map<String, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
 		try {
-			return (StreamRecordSerializer<?>) InstantiationUtil.readObjectFromConfig(this.config,
-					CHAINED_IN_SERIALIZER + chainedTaskIndex, cl);
+
+			return (Map<String, StreamConfig>) InstantiationUtil.readObjectFromConfig(this.config,
+					CHAINED_TASK_CONFIG, cl);
 		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
+			throw new RuntimeException("Could not instantiate configuration.");
 		}
 	}
 
-	public void setChainedSerializer(StreamRecordSerializer<?> typeWrapper, int chainedTaskIndex) {
-		config.setBytes(CHAINED_IN_SERIALIZER + chainedTaskIndex,
-				SerializationUtils.serialize(typeWrapper));
+	public void setChainStart() {
+		config.setBoolean(IS_CHAINED_VERTEX, true);
 	}
 
-	public void setChainedInvokable(ChainableInvokable<?, ?> invokable, int chainedTaskIndex) {
-		config.setBytes(CHAINED_INVOKABLE + chainedTaskIndex,
-				SerializationUtils.serialize(invokable));
+	public boolean isChainStart() {
+		return config.getBoolean(IS_CHAINED_VERTEX, false);
 	}
 
+	@Override
+	public String toString() {
+
+		ClassLoader cl = getClass().getClassLoader();
+
+		StringBuilder builder = new StringBuilder();
+		builder.append("\n=======================");
+		builder.append("Stream Config");
+		builder.append("=======================");
+		builder.append("\nTask name: " + getTaskName());
+		builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
+		builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
+		builder.append("\nOutput names: " + getOutputs(cl));
+		builder.append("\nPartitioning:");
+		for (String outputname : getOutputs(cl)) {
+			builder.append("\n\t" + outputname + ": "
+					+ getPartitioner(cl, outputname).getClass().getSimpleName());
+		}
+
+		builder.append("\nChained subtasks: " + getChainedOutputs(cl));
+
+		try {
+			builder.append("\nInvokable: " + getUserInvokable(cl).getClass().getSimpleName());
+		} catch (Exception e) {
+			builder.append("\nInvokable: Missing");
+		}
+		builder.append("\nBuffer timeout: " + getBufferTimeout());
+		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
+			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
+			builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
+		}
+
+		return builder.toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
index b7e57e0..a95973b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
@@ -30,7 +30,7 @@ public class CollectorWrapper<OUT> implements Collector<OUT> {
 		this.outputs = new LinkedList<Collector<OUT>>();
 	}
 
-	public void addChainedOutput(Collector<OUT> output) {
+	public void addCollector(Collector<OUT> output) {
 		outputs.add(output);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
index 4c21564..6fd1b98 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -17,11 +17,13 @@
 
 package org.apache.flink.streaming.api.collector;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.io.StreamRecordWriter;
 import org.apache.flink.util.Collector;
 
 public class StreamOutput<OUT> implements Collector<SerializationDelegate<StreamRecord<OUT>>> {
@@ -52,6 +54,15 @@ public class StreamOutput<OUT> implements Collector<SerializationDelegate<Stream
 
 	@Override
 	public void close() {
+		if (output instanceof StreamRecordWriter) {
+			((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close();
+		} else {
+			try {
+				output.flush();
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
 	}
 
 	public boolean isSelectAll() {
@@ -62,4 +73,8 @@ public class StreamOutput<OUT> implements Collector<SerializationDelegate<Stream
 		return selectedNames;
 	}
 
+	public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> getRecordWriter() {
+		return output;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
index fa374b1..c3e4c9d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.collector;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
@@ -80,11 +79,6 @@ public class StreamOutputWrapper<OUT> implements Collector<OUT> {
 		outputs.add(output);
 	}
 
-	protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
-			List<String> outputNames, boolean isSelectAllOutput) {
-
-	}
-
 	/**
 	 * Collects and emits a tuple/object to the outputs by reusing a
 	 * StreamRecord object.

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 60a7b14..99f826d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -19,13 +19,15 @@ package org.apache.flink.streaming.api.streamvertex;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.collector.CollectorWrapper;
 import org.apache.flink.streaming.api.collector.DirectedOutputWrapper;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamOutput;
@@ -45,83 +47,142 @@ public class OutputHandler<OUT> {
 
 	private StreamVertex<?, OUT> vertex;
 	private StreamConfig configuration;
-
-	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+	private ClassLoader cl;
 	private Collector<OUT> outerCollector;
 
-	TypeInformation<OUT> outTypeInfo = null;
-	StreamRecordSerializer<OUT> outSerializer = null;
-	SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
-
 	public List<ChainableInvokable<?, ?>> chainedInvokables;
 
-	private int numberOfChainedTasks;
+	private Map<String, StreamOutput<?>> outputMap;
+	private Map<String, StreamConfig> chainedConfigs;
+	private List<String> recordWriterOrder;
 
 	public OutputHandler(StreamVertex<?, OUT> vertex) {
+
+		// Initialize some fields
 		this.vertex = vertex;
-		this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 		this.configuration = new StreamConfig(vertex.getTaskConfiguration());
 		this.chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>();
-		this.numberOfChainedTasks = configuration.getNumberofChainedTasks();
+		this.outputMap = new HashMap<String, StreamOutput<?>>();
+		this.cl = vertex.getUserCodeClassLoader();
+
+		// We read the chained configs, and the order of record writer
+		// registrations by outputname
+		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
+		this.recordWriterOrder = configuration.getRecordWriterOrder(cl);
+
+		// For the network outputs of the chain head we create the stream
+		// outputs
+		for (String outName : configuration.getOutputs(cl)) {
+			StreamOutput<?> streamOutput = createStreamOutput(outName, configuration);
+			outputMap.put(outName, streamOutput);
+		}
 
-		this.outerCollector = createChainedCollector(0);
+		// If we have chained tasks we iterate through them and create the
+		// stream outputs for the network outputs
+		if (chainedConfigs != null) {
+			for (StreamConfig config : chainedConfigs.values()) {
+				for (String outName : config.getOutputs(cl)) {
+					StreamOutput<?> streamOutput = createStreamOutput(outName, config);
+					outputMap.put(outName, streamOutput);
+				}
+			}
+		}
+
+		// We create the outer collector that will be passed to the first task
+		// in the chain
+		this.outerCollector = createChainedCollector(configuration);
 
 	}
 
-	public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() {
-		return outputs;
+	public Collection<StreamOutput<?>> getOutputs() {
+		return outputMap.values();
 	}
 
-	// We create the outer collector by nesting the chainable invokables into
-	// each other
+	/**
+	 * This method builds up a nested collector which encapsulates all the
+	 * chained operators and their network output. The result of this recursive
+	 * call will be passed as collector to the first invokable in the chain.
+	 * 
+	 * @param chainedTaskConfig
+	 *            The configuration of the starting operator of the chain, we
+	 *            use this paramater to recursively build the whole chain
+	 * @return Returns the collector for the chain starting from the given
+	 *         config
+	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private Collector<OUT> createChainedCollector(int chainedTaskIndex) {
+	private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
 
-		if (numberOfChainedTasks == chainedTaskIndex) {
-			// At the end of the chain we create the collector that sends data
-			// to the recordwriters
-			return createNetworkCollector();
-		} else {
+		// We create a wrapper that will encapsulate the chained operators and
+		// network outputs
+		CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>();
 
-			ChainableInvokable chainableInvokable = configuration.getChainedInvokable(
-					chainedTaskIndex, vertex.getUserCodeClassLoader());
+		// If the task has network outputs we create a collector for those and
+		// pass
+		// it to the wrapper
+		if (chainedTaskConfig.getNumberOfOutputs() > 0) {
+			wrapper.addCollector((Collector<OUT>) createNetworkCollector(chainedTaskConfig));
+		}
 
-			// The nesting is done by calling this method recursively when
-			// passing the collector to the invokable
-			chainableInvokable.setup(
-					createChainedCollector(chainedTaskIndex + 1),
-					configuration.getChainedInSerializer(chainedTaskIndex,
-							vertex.getUserCodeClassLoader()));
+		// If the task has chained outputs we create a chained collector for
+		// each of them and pass it to the wrapper
+		for (String output : chainedTaskConfig.getChainedOutputs(cl)) {
+			wrapper.addCollector(createChainedCollector(chainedConfigs.get(output)));
+		}
 
-			// We hold a list of the chained invokables for initializaton
-			// afterwards
-			chainedInvokables.add(chainableInvokable);
+		if (chainedTaskConfig.isChainStart()) {
+			// The current task is the first chained task at this vertex so we
+			// return the wrapper
+			return wrapper;
+		} else {
+			// The current task is a part of the chain so we get the chainable
+			// invokable which will be returned and set it up using the wrapper
+			ChainableInvokable chainableInvokable = chainedTaskConfig.getUserInvokable(vertex
+					.getUserCodeClassLoader());
+			chainableInvokable.setup(wrapper,
+					chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
 
+			chainedInvokables.add(chainableInvokable);
 			return chainableInvokable;
 		}
 
 	}
 
-	private Collector<OUT> createNetworkCollector() {
+	/**
+	 * We create the collector for the network outputs of the task represented
+	 * by the config using the StreamOutputs that we have set up in the
+	 * constructor.
+	 * 
+	 * @param config
+	 *            The config of the task
+	 * @return We return a collector that represents all the network outputs of
+	 *         this task
+	 */
+	@SuppressWarnings("unchecked")
+	private <T> Collector<T> createNetworkCollector(StreamConfig config) {
+
+		StreamRecordSerializer<T> outSerializer = config
+				.getTypeSerializerOut1(vertex.userClassLoader);
+		SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null;
 
-		createOutSerializer();
+		if (outSerializer != null) {
+			outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer);
+			outSerializationDelegate.setInstance(outSerializer.createInstance());
+		}
 
-		StreamOutputWrapper<OUT> collector;
+		StreamOutputWrapper<T> collector;
 
 		if (vertex.configuration.isDirectedEmit()) {
-			OutputSelector<OUT> outputSelector = vertex.configuration
+			OutputSelector<T> outputSelector = vertex.configuration
 					.getOutputSelector(vertex.userClassLoader);
 
-			collector = new DirectedOutputWrapper<OUT>(vertex.getInstanceID(),
+			collector = new DirectedOutputWrapper<T>(vertex.getInstanceID(),
 					outSerializationDelegate, outputSelector);
 		} else {
-			collector = new StreamOutputWrapper<OUT>(vertex.getInstanceID(),
-					outSerializationDelegate);
+			collector = new StreamOutputWrapper<T>(vertex.getInstanceID(), outSerializationDelegate);
 		}
 
-		int numberOfOutputs = configuration.getNumberOfOutputs();
-		for (int i = 0; i < numberOfOutputs; i++) {
-			collector = (StreamOutputWrapper<OUT>) addStreamOutput(i, collector);
+		for (String output : config.getOutputs(cl)) {
+			collector.addOutput((StreamOutput<T>) outputMap.get(output));
 		}
 
 		return collector;
@@ -131,32 +192,35 @@ public class OutputHandler<OUT> {
 		return outerCollector;
 	}
 
-	void createOutSerializer() {
-		outSerializer = configuration.getTypeSerializerOut1(vertex.userClassLoader);
-		if (outSerializer != null) {
-			outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
-			outSerializationDelegate.setInstance(outSerializer.createInstance());
-		}
-	}
+	/**
+	 * We create the StreamOutput for the specific output given by the name, and
+	 * the configuration of its source task
+	 * 
+	 * @param name
+	 *            Name of the output to which the streamoutput will be set up
+	 * @param configuration
+	 *            The config of upStream task
+	 * @return
+	 */
+	private <T> StreamOutput<T> createStreamOutput(String name, StreamConfig configuration) {
 
-	Collector<OUT> addStreamOutput(int outputNumber, StreamOutputWrapper<OUT> networkCollector) {
+		int outputNumber = recordWriterOrder.indexOf(name);
 
-		StreamPartitioner<OUT> outputPartitioner;
+		StreamPartitioner<T> outputPartitioner;
 
 		try {
-			outputPartitioner = configuration.getPartitioner(vertex.userClassLoader,
-					outputNumber);
+			outputPartitioner = configuration.getPartitioner(vertex.userClassLoader, name);
 		} catch (Exception e) {
 			throw new StreamVertexException("Cannot deserialize partitioner for "
-					+ vertex.getName() + " with " + outputNumber + " outputs", e);
+					+ vertex.getName() + " with " + name + " outputs", e);
 		}
 
-		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
+		RecordWriter<SerializationDelegate<StreamRecord<T>>> output;
 
 		long bufferTimeout = configuration.getBufferTimeout();
 
 		if (bufferTimeout >= 0) {
-			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex
+			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
 					.getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout);
 
 			if (LOG.isTraceEnabled()) {
@@ -164,7 +228,7 @@ public class OutputHandler<OUT> {
 						bufferTimeout, vertex.getClass().getSimpleName());
 			}
 		} else {
-			output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex
+			output = new RecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
 					.getEnvironment().getWriter(outputNumber), outputPartitioner);
 
 			if (LOG.isTraceEnabled()) {
@@ -172,34 +236,28 @@ public class OutputHandler<OUT> {
 			}
 		}
 
-		outputs.add(output);
-
-		networkCollector.addOutput(new StreamOutput<OUT>(output, configuration
-				.isSelectAll(outputNumber) ? null : configuration.getOutputNames(outputNumber)));
+		StreamOutput<T> streamOutput = new StreamOutput<T>(output,
+				configuration.isSelectAll(name) ? null : configuration.getOutputNames(name));
 
 		if (LOG.isTraceEnabled()) {
 			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
 					.getSimpleName(), outputNumber, vertex.getClass().getSimpleName());
 		}
 
-		return networkCollector;
+		return streamOutput;
 	}
 
 	public void flushOutputs() throws IOException, InterruptedException {
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			if (output instanceof StreamRecordWriter) {
-				((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close();
-			} else {
-				output.flush();
-			}
+		for (StreamOutput<?> streamOutput : getOutputs()) {
+			streamOutput.close();
 		}
 	}
 
 	public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable)
 			throws IOException, InterruptedException {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoked with instance id {}", componentTypeName,
-					vertex.getName(), vertex.getInstanceID());
+			LOG.debug("{} {} invoked with instance id {}", componentTypeName, vertex.getName(),
+					vertex.getInstanceID());
 		}
 
 		try {
@@ -210,8 +268,8 @@ public class OutputHandler<OUT> {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoke finished instance id {}", componentTypeName,
-					vertex.getName(), vertex.getInstanceID());
+			LOG.debug("{} {} invoke finished instance id {}", componentTypeName, vertex.getName(),
+					vertex.getInstanceID());
 		}
 
 		flushOutputs();

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
index 43b455e..cba23b8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -17,19 +17,22 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.BlockingQueueBroker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT> {
+public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT, OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
 
@@ -72,6 +75,15 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT
 		}
 
 		StreamRecord<OUT> nextRecord;
+		StreamRecordSerializer<OUT> serializer = configuration
+				.getTypeSerializerOut1(userClassLoader);
+		SerializationDelegate<StreamRecord<OUT>> serializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
+				serializer);
+
+		List<StreamOutput<OUT>> outputs = new LinkedList<StreamOutput<OUT>>();
+		for (StreamOutput<?> output : outputHandler.getOutputs()) {
+			outputs.add((StreamOutput<OUT>) output);
+		}
 
 		while (true) {
 			if (shouldWait) {
@@ -82,10 +94,9 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT
 			if (nextRecord == null) {
 				break;
 			}
-			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler
-					.getOutputs()) {
-				outputHandler.outSerializationDelegate.setInstance(nextRecord);
-				output.emit(outputHandler.outSerializationDelegate);
+			for (StreamOutput<OUT> output : outputs) {
+				serializationDelegate.setInstance(nextRecord);
+				output.collect(serializationDelegate);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 53b75a0..698b193 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -78,6 +79,15 @@ class DataStream[T](javaStream: JavaStream[T]) {
         " "  +
         "parallelism.")
   }
+  
+  def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.setChainingStrategy(strategy)
+      case _ =>
+        throw new UnsupportedOperationException("Only supported for operators.")
+    }
+    this
+  }
 
   /**
    * Creates a new DataStream by merging DataStream outputs of


Mime
View raw message