flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [9/9] flink git commit: [hotfix] Cleanup routing of records in OperatorChain
Date Mon, 08 Feb 2016 23:06:13 GMT
[hotfix] Cleanup routing of records in OperatorChain


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c6254e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c6254e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c6254e

Branch: refs/heads/master
Commit: 28c6254ee385fe746e868a81b2207bf66b552174
Parents: e9c83ea
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 8 16:14:00 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 8 20:36:35 2016 +0100

----------------------------------------------------------------------
 .../BroadcastOutputSelectorWrapper.java         |  45 -------
 .../api/collector/selector/DirectedOutput.java  | 130 +++++++++++++++++++
 .../selector/DirectedOutputSelectorWrapper.java |  97 --------------
 .../selector/OutputSelectorWrapper.java         |   9 +-
 .../selector/OutputSelectorWrapperFactory.java  |  33 -----
 .../flink/streaming/api/graph/StreamConfig.java |  20 +--
 .../flink/streaming/api/graph/StreamNode.java   |  10 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../streaming/runtime/io/CollectorWrapper.java  |  61 ---------
 .../streaming/runtime/tasks/OperatorChain.java  |  84 ++++++++++--
 .../flink/streaming/api/OutputSplitterTest.java |   2 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |   9 +-
 12 files changed, 225 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
deleted file mode 100644
index 7034b11..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT>
{
-
-	private static final long serialVersionUID = 1L;
-	
-	private final ArrayList<Collector<StreamRecord<OUT>>> outputs;
-
-	public BroadcastOutputSelectorWrapper() {
-		outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
-	}
-	
-	@Override
-	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge)
{
-		outputs.add(output);
-	}
-
-	@Override
-	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record)
{
-		return outputs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
new file mode 100644
index 0000000..52c50b3
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+
+public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
+	
+	private final OutputSelector<OUT>[] outputSelectors;
+
+	private final Output<StreamRecord<OUT>>[] selectAllOutputs;
+	
+	private final HashMap<String, Output<StreamRecord<OUT>>[]> outputMap;
+	
+	private final Output<StreamRecord<OUT>>[] allOutputs;
+
+	
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public DirectedOutput(
+			List<OutputSelector<OUT>> outputSelectors,
+			List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs)
+	{
+		this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]);
+
+		this.allOutputs = new Output[outputs.size()];
+		for (int i = 0; i < outputs.size(); i++) {
+			allOutputs[i] = outputs.get(i).f0;
+		}
+		
+		
+		HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new HashSet<Output<StreamRecord<OUT>>>();
+		HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap
= new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>();
+		
+		for (Tuple2<Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs)
{
+			final Output<StreamRecord<OUT>> output = outputPair.f0;
+			final StreamEdge edge = outputPair.f1;
+	
+			List<String> selectedNames = edge.getSelectedNames();
+
+			if (selectedNames.isEmpty()) {
+				selectAllOutputs.add(output);
+			}
+			else {
+				for (String selectedName : selectedNames) {
+					if (!outputMap.containsKey(selectedName)) {
+						outputMap.put(selectedName, new ArrayList<Output<StreamRecord<OUT>>>());
+						outputMap.get(selectedName).add(output);
+					}
+					else {
+						if (!outputMap.get(selectedName).contains(output)) {
+							outputMap.get(selectedName).add(output);
+						}
+					}
+				}
+			}
+		}
+		
+		this.selectAllOutputs = selectAllOutputs.toArray(new Output[selectAllOutputs.size()]);
+		
+		this.outputMap = new HashMap<>();
+		for (Map.Entry<String, ArrayList<Output<StreamRecord<OUT>>>> entry
: outputMap.entrySet()) {
+			Output<StreamRecord<OUT>>[] arr = entry.getValue().toArray(new Output[entry.getValue().size()]);
+			this.outputMap.put(entry.getKey(), arr);
+		}
+	}
+
+
+	@Override
+	public void emitWatermark(Watermark mark) {
+		for (Output<StreamRecord<OUT>> out : allOutputs) {
+			out.emitWatermark(mark);
+		}
+	}
+
+	@Override
+	public void collect(StreamRecord<OUT> record) {
+		Set<Output<StreamRecord<OUT>>> selectedOutputs = new HashSet<Output<StreamRecord<OUT>>>(selectAllOutputs.length);
+		Collections.addAll(selectedOutputs, selectAllOutputs);
+
+		for (OutputSelector<OUT> outputSelector : outputSelectors) {
+			Iterable<String> outputNames = outputSelector.select(record.getValue());
+
+			for (String outputName : outputNames) {
+				Output<StreamRecord<OUT>>[] outputList = outputMap.get(outputName);
+				if (outputList != null) {
+					Collections.addAll(selectedOutputs, outputList);
+				}
+			}
+		}
+		
+		for (Output<StreamRecord<OUT>> out : selectedOutputs) {
+			out.collect(record);
+		}
+	}
+
+	@Override
+	public void close() {
+		for (Output<StreamRecord<OUT>> out : allOutputs) {
+			out.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
deleted file mode 100644
index 84558fc..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT>
{
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class);
-
-	private List<OutputSelector<OUT>> outputSelectors;
-
-	private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>
outputMap;
-	private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs;
-
-	public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors)
{
-		this.outputSelectors = outputSelectors;
-		this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>();
-		this.outputMap = new HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>();
-	}
-	
-	@Override
-	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge)
{
-		List<String> selectedNames = edge.getSelectedNames();
-
-		if (selectedNames.isEmpty()) {
-			selectAllOutputs.add(output);
-		}
-		else {
-			for (String selectedName : selectedNames) {
-				if (!outputMap.containsKey(selectedName)) {
-					outputMap.put(selectedName, new ArrayList<Collector<StreamRecord<OUT>>>());
-					outputMap.get(selectedName).add(output);
-				}
-				else {
-					if (!outputMap.get(selectedName).contains(output)) {
-						outputMap.get(selectedName).add(output);
-					}
-				}
-			}
-		}
-	}
-
-	@Override
-	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record)
{
-		Set<Collector<StreamRecord<OUT>>> selectedOutputs = new HashSet<Collector<StreamRecord<OUT>>>(selectAllOutputs);
-
-		for (OutputSelector<OUT> outputSelector : outputSelectors) {
-			Iterable<String> outputNames = outputSelector.select(record);
-
-			for (String outputName : outputNames) {
-				List<Collector<StreamRecord<OUT>>> outputList = outputMap.get(outputName);
-
-				try {
-					selectedOutputs.addAll(outputList);
-				} catch (NullPointerException e) {
-					if (LOG.isErrorEnabled()) {
-						String format = String.format(
-								"Cannot emit because no output is selected with the name: %s",
-								outputName);
-						LOG.error(format);
-					}
-				}
-			}
-		}
-
-		return selectedOutputs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
index f25c995..971e42b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -19,14 +19,7 @@ package org.apache.flink.streaming.api.collector.selector;
 
 import java.io.Serializable;
 
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
 public interface OutputSelectorWrapper<OUT> extends Serializable {
 
-	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge);
-
-	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
-
+	void sendOutputs(OUT record);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
deleted file mode 100644
index dca2ede..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.List;
-
-public class OutputSelectorWrapperFactory {
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public static OutputSelectorWrapper<?> create(List<OutputSelector<?>>
outputSelectors) {
-		if (outputSelectors.size() == 0) {
-			return new BroadcastOutputSelectorWrapper();
-		} else {
-			return new DirectedOutputSelectorWrapper(outputSelectors);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 7a07c79..311b7fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +30,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
@@ -38,7 +39,7 @@ import org.apache.flink.util.InstantiationUtil;
 public class StreamConfig implements Serializable {
 
 	private static final long serialVersionUID = 1L;
-
+	
 	// ------------------------------------------------------------------------
 	//  Config Keys
 	// ------------------------------------------------------------------------
@@ -191,19 +192,22 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper)
{
+	public void setOutputSelectors(List<OutputSelector<?>> outputSelectors) {
 		try {
-			InstantiationUtil.writeObjectToConfig(outputSelectorWrapper, this.config, OUTPUT_SELECTOR_WRAPPER);
+			InstantiationUtil.writeObjectToConfig(outputSelectors, this.config, OUTPUT_SELECTOR_WRAPPER);
 		} catch (IOException e) {
-			throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e);
+			throw new StreamTaskException("Could not serialize output selectors", e);
 		}
 	}
 	
-	public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl)
{
+	public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader userCodeClassloader)
{
 		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, cl);
+			List<OutputSelector<T>> selectors = 
+					InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, userCodeClassloader);
+			return selectors == null ? Collections.<OutputSelector<T>>emptyList() : selectors;
+			
 		} catch (Exception e) {
-			throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.",
e);
+			throw new StreamTaskException("Could not read output selectors", e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0a612f3..3e06037 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -26,15 +26,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 
 /**
- * Class representing the operators in the streaming programs, with all their
- * properties.
- * 
+ * Class representing the operators in the streaming programs, with all their properties.
  */
 public class StreamNode implements Serializable {
 
@@ -168,10 +164,6 @@ public class StreamNode implements Serializable {
 		return outputSelectors;
 	}
 
-	public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
-		return OutputSelectorWrapperFactory.create(getOutputSelectors());
-	}
-
 	public void addOutputSelector(OutputSelector<?> outputSelector) {
 		this.outputSelectors.add(outputSelector);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c0d2856..c810e47 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -310,7 +310,7 @@ public class StreamingJobGraphGenerator {
 		config.setTypeSerializerOut(vertex.getTypeSerializerOut());
 
 		config.setStreamOperator(vertex.getOperator());
-		config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
+		config.setOutputSelectors(vertex.getOutputSelectors());
 
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setNonChainedOutputs(nonChainableOutputs);

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
deleted file mode 100644
index 01e997d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>>
{
-
-	private OutputSelectorWrapper<OUT> outputSelectorWrapper;
-
-	private ArrayList<Output<StreamRecord<OUT>>> allOutputs;
-
-	public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
-		this.outputSelectorWrapper = outputSelectorWrapper;
-		allOutputs = new ArrayList<Output<StreamRecord<OUT>>>();
-	}
-	
-	public void addCollector(Output<StreamRecord<OUT>> output, StreamEdge edge)
{
-		outputSelectorWrapper.addCollector(output, edge);
-		allOutputs.add(output);
-	}
-
-	@Override
-	public void collect(StreamRecord<OUT> record) {
-		for (Collector<StreamRecord<OUT>> output : outputSelectorWrapper.getSelectedOutputs(record.getValue()))
{
-			output.collect(record);
-		}
-	}
-
-	@Override
-	public void emitWatermark(Watermark mark) {
-		for (Output<?> output : allOutputs) {
-			output.emitWatermark(mark);
-		}
-	}
-
-	@Override
-	public void close() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 125279c..5313bc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -24,15 +24,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.CollectorWrapper;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -45,6 +46,13 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * The {@code OperatorChain} contains all operators that are executed as one chain within
a single
+ * {@link StreamTask}.
+ * 
+ * @param <OUT> The type of elements accepted by the chain, i.e., the input type of
the chain's
+ *              head operator.
+ */
 public class OperatorChain<OUT> {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
@@ -182,15 +190,14 @@ public class OperatorChain<OUT> {
 			Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
 			List<StreamOperator<?>> allOperators)
 	{
-		// We create a wrapper that will encapsulate the chained operators and network outputs
-		OutputSelectorWrapper<T> outputSelectorWrapper = operatorConfig.getOutputSelectorWrapper(userCodeClassloader);
-		CollectorWrapper<T> wrapper = new CollectorWrapper<T>(outputSelectorWrapper);
-
+		List<Tuple2<Output<StreamRecord<T>>, StreamEdge>> allOutputs =
new ArrayList<>(4);
+		
 		// create collectors for the network outputs
 		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader))
{
 			@SuppressWarnings("unchecked")
 			RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
-			wrapper.addCollector(output, outputEdge);
+			
+			allOutputs.add(new Tuple2<Output<StreamRecord<T>>, StreamEdge>(output,
outputEdge));
 		}
 
 		// Create collectors for the chained outputs
@@ -200,9 +207,37 @@ public class OperatorChain<OUT> {
 
 			Output<StreamRecord<T>> output = createChainedOperator(
 					containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs,
allOperators);
-			wrapper.addCollector(output, outputEdge);
+			
+			allOutputs.add(new Tuple2<>(output, outputEdge));
+		}
+		
+		// if there are multiple outputs, or the outputs are directed, we need to
+		// wrap them as one output
+		
+		List<OutputSelector<T>> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
+		
+		if (selectors == null || selectors.isEmpty()) {
+			// simple path, no selector necessary
+			if (allOutputs.size() == 1) {
+				return allOutputs.get(0).f0;
+			}
+			else {
+				// send to N outputs. Note that this includes teh special case
+				// of sending to zero outputs
+				@SuppressWarnings({"unchecked", "rawtypes"})
+				Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
+				for (int i = 0; i < allOutputs.size(); i++) {
+					asArray[i] = allOutputs.get(i).f0;
+				}
+				
+				return new BroadcastingOutputCollector<T>(asArray);
+			}
+		}
+		else {
+			// selector present, more complex routing necessary
+			return new DirectedOutput<T>(selectors, allOutputs);
+			
 		}
-		return wrapper;
 	}
 	
 	private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
@@ -309,7 +344,6 @@ public class OperatorChain<OUT> {
 		@Override
 		public void collect(StreamRecord<T> record) {
 			try {
-
 				StreamRecord<T> copy = new StreamRecord<>(serializer.copy(record.getValue()),
record.getTimestamp());
 
 				operator.setKeyContextElement1(copy);
@@ -320,4 +354,34 @@ public class OperatorChain<OUT> {
 			}
 		}
 	}
+	
+	private static final class BroadcastingOutputCollector<T> implements Output<StreamRecord<T>>
{
+		
+		private final Output<StreamRecord<T>>[] outputs;
+		
+		public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
+			this.outputs = outputs;
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			for (Output<StreamRecord<T>> output : outputs) {
+				output.emitWatermark(mark);
+			}
+		}
+
+		@Override
+		public void collect(StreamRecord<T> record) {
+			for (Output<StreamRecord<T>> output : outputs) {
+				output.collect(record);
+			}
+		}
+
+		@Override
+		public void close() {
+			for (Output<StreamRecord<T>> output : outputs) {
+				output.close();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index 8525d37..5126d11 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+
 import org.junit.Test;
 
 public class OutputSplitterTest extends StreamingMultipleProgramsTestBase {

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 2cca3ff..e32b304 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -26,7 +27,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleIn
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -40,11 +40,11 @@ import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-
 /**
  * Test harness for testing a {@link StreamTask}.
  *
@@ -91,6 +91,7 @@ public class StreamTaskTestHarness<OUT> {
 	// input related methods only need to be implemented once, in generic form
 	protected int numInputGates;
 	protected int numInputChannelsPerGate;
+	
 	@SuppressWarnings("rawtypes")
 	protected StreamTestSingleInputGate[] inputGates;
 
@@ -128,7 +129,7 @@ public class StreamTaskTestHarness<OUT> {
 
 		mockEnv.addOutput(outputList, outputStreamRecordSerializer);
 
-		streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>());
+		streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
 		streamConfig.setNumberOfOutputs(1);
 
 		StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {


Mime
View raw message