flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject flink git commit: [FLINK-1381] Allow multiple output splitters for single stream operator
Date Sun, 25 Jan 2015 16:41:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master 095dc4a54 -> 7ce9a8ff9


[FLINK-1381] Allow multiple output splitters for single stream operator

Closes #332

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ce9a8ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ce9a8ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ce9a8ff

Branch: refs/heads/master
Commit: 7ce9a8ff90d1ec7fff8d65c24c58e11a0aa6f445
Parents: 095dc4a
Author: mingliang <qmlmoon@gmail.com>
Authored: Fri Jan 23 10:59:02 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sun Jan 25 15:24:40 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       |  17 +-
 .../apache/flink/streaming/api/StreamGraph.java |  13 +-
 .../api/StreamingJobGraphGenerator.java         |   2 +-
 .../api/collector/DirectedCollectorWrapper.java |  43 +++--
 .../streaming/api/datastream/DataStream.java    |  30 ++-
 .../datastream/SingleOutputStreamOperator.java  |  23 ---
 .../api/datastream/SplitDataStream.java         |   5 +-
 .../api/streamvertex/OutputHandler.java         |   2 +-
 .../flink/streaming/api/OutputSplitterTest.java | 185 +++++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  |   7 +-
 10 files changed, 257 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 94349d7..1d51216 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -186,17 +186,22 @@ public class StreamConfig implements Serializable {
 		return config.getBoolean(DIRECTED_EMIT, false);
 	}
 
-	public void setOutputSelector(OutputSelector<?> outputSelector) {
-		if (outputSelector != null) {
-			setDirectedEmit(true);
-			config.setBytes(OUTPUT_SELECTOR, SerializationUtils.serialize(outputSelector));
+
+	public void setOutputSelectors(List<OutputSelector<?>> outputSelector) {
+		try {
+			if (outputSelector != null) {
+				setDirectedEmit(true);
+				config.setBytes(OUTPUT_SELECTOR, SerializationUtils.serialize((Serializable) outputSelector));
+			}
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize OutputSelector");
 		}
 	}
 
 	@SuppressWarnings("unchecked")
-	public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) {
+	public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl)
{
 		try {
-			return (OutputSelector<T>) InstantiationUtil.readObjectFromConfig(this.config,
+			return (List<OutputSelector<T>>) InstantiationUtil.readObjectFromConfig(this.config,
 					OUTPUT_SELECTOR, cl);
 		} catch (Exception e) {
 			throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index c9ecd55..2a0d0c7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -66,8 +66,8 @@ public class StreamGraph {
 	private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
 	private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
 	private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
-	private Map<String, OutputSelector<?>> outputSelectors;
 	private Map<String, Class<? extends AbstractInvokable>> jobVertexClasses;
+	private Map<String, List<OutputSelector<?>>> outputSelectors;
 	private Map<String, Integer> iterationIds;
 	private Map<Integer, String> iterationIDtoHeadName;
 	private Map<Integer, String> iterationIDtoTailName;
@@ -101,7 +101,7 @@ public class StreamGraph {
 		typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>();
 		typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>();
 		typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>();
-		outputSelectors = new HashMap<String, OutputSelector<?>>();
+		outputSelectors = new HashMap<String, List<OutputSelector<?>>>();
 		jobVertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
 		iterationIds = new HashMap<String, Integer>();
 		iterationIDtoHeadName = new HashMap<Integer, String>();
@@ -272,6 +272,7 @@ public class StreamGraph {
 		outEdgeLists.put(vertexName, new ArrayList<String>());
 		outEdgeTypes.put(vertexName, new ArrayList<Integer>());
 		selectedNames.put(vertexName, new ArrayList<List<String>>());
+		outputSelectors.put(vertexName, new ArrayList<OutputSelector<?>>());
 		inEdgeLists.put(vertexName, new ArrayList<String>());
 		outputPartitioners.put(vertexName, new ArrayList<StreamPartitioner<?>>());
 		iterationTailCount.put(vertexName, 0);
@@ -385,10 +386,10 @@ public class StreamGraph {
 	 * @param vertexName
 	 *            Name of the vertex for which the output selector will be set
 	 * @param outputSelector
-	 *            The outputselector object
+	 *            The user defined output selector.
 	 */
-	public void setOutputSelector(String vertexName, OutputSelector<?> outputSelector)
{
-		outputSelectors.put(vertexName, outputSelector);
+	public <T> void setOutputSelector(String vertexName, OutputSelector<T> outputSelector)
{
+		outputSelectors.get(vertexName).add(outputSelector);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Outputselector set for {}", vertexName);
@@ -520,7 +521,7 @@ public class StreamGraph {
 		return inputFormatLists.get(vertexName);
 	}
 
-	public OutputSelector<?> getOutputSelector(String vertexName) {
+	public List<OutputSelector<?>> getOutputSelector(String vertexName) {
 		return outputSelectors.get(vertexName);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 3b2d135..fccb1e1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -191,7 +191,7 @@ public class StreamingJobGraphGenerator {
 		config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName));
 
 		config.setUserInvokable(streamGraph.getInvokable(vertexName));
-		config.setOutputSelector(streamGraph.getOutputSelector(vertexName));
+		config.setOutputSelectors(streamGraph.getOutputSelector(vertexName));
 		config.setOperatorStates(streamGraph.getState(vertexName));
 
 		config.setNumberOfOutputs(nonChainableOutputs.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
index 66fb667..4681cd3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
@@ -40,7 +40,7 @@ public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT>
{
 
 	private static final Logger LOG = LoggerFactory.getLogger(DirectedCollectorWrapper.class);
 
-	OutputSelector<OUT> outputSelector;
+	List<OutputSelector<OUT>> outputSelectors;
 
 	protected Map<String, List<Collector<OUT>>> outputMap;
 
@@ -53,8 +53,8 @@ public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT>
{
 	 * @param outputSelector
 	 *            User defined {@link OutputSelector}
 	 */
-	public DirectedCollectorWrapper(OutputSelector<OUT> outputSelector) {
-		this.outputSelector = outputSelector;
+	public DirectedCollectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
+		this.outputSelectors = outputSelectors;
 		this.emitted = new HashSet<Collector<OUT>>();
 		this.selectAllOutputs = new LinkedList<Collector<OUT>>();
 		this.outputMap = new HashMap<String, List<Collector<OUT>>>();
@@ -91,34 +91,37 @@ public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT>
{
 	public void collect(OUT record) {
 		emitted.clear();
 
-		Iterable<String> outputNames = outputSelector.select(record);
-
 		for (Collector<OUT> output : selectAllOutputs) {
 			output.collect(record);
 			emitted.add(output);
 		}
 
-		for (String outputName : outputNames) {
-			List<Collector<OUT>> outputList = outputMap.get(outputName);
-			if (outputList == null) {
-				if (LOG.isErrorEnabled()) {
-					String format = String.format(
-							"Cannot emit because no output is selected with the name: %s",
-							outputName);
-					LOG.error(format);
+		for (OutputSelector<OUT> outputSelector : outputSelectors) {
+			Iterable<String> outputNames = outputSelector.select(record);
 
-				}
-			} else {
-				for (Collector<OUT> output : outputList) {
-					if (!emitted.contains(output)) {
-						output.collect(record);
-						emitted.add(output);
+			for (String outputName : outputNames) {
+				List<Collector<OUT>> outputList = outputMap.get(outputName);
+				if (outputList == null) {
+					if (LOG.isErrorEnabled()) {
+						String format = String.format(
+								"Cannot emit because no output is selected with the name: %s",
+								outputName);
+						LOG.error(format);
+
+					}
+				} else {
+					for (Collector<OUT> output : outputList) {
+						if (!emitted.contains(output)) {
+							output.collect(record);
+							emitted.add(output);
+						}
 					}
+
 				}
 
 			}
-
 		}
+
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0f633ec..b30d261 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.StreamGraph;
+import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
 import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -225,6 +226,25 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Operator used for directing tuples to specific named outputs using an
+	 * {@link org.apache.flink.streaming.api.collector.OutputSelector}. Calling
+	 * this method on an operator creates a new {@link SplitDataStream}.
+	 * 
+	 * @param outputSelector
+	 *            The user defined
+	 *            {@link org.apache.flink.streaming.api.collector.OutputSelector}
+	 *            for directing the tuples.
+	 * @return The {@link SplitDataStream}
+	 */
+	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
+		for (DataStream<OUT> ds : this.mergedStreams) {
+			streamGraph.setOutputSelector(ds.getId(), clean(outputSelector));
+		}
+
+		return new SplitDataStream<OUT>(this);
+	}
+
+	/**
 	 * Creates a new {@link ConnectedDataStream} by connecting
 	 * {@link DataStream} outputs of different type with each other. The
 	 * DataStreams connected using this operators can be used with CoFunctions.
@@ -382,8 +402,7 @@ public class DataStream<OUT> {
 	 * the data stream that will be fed back and used as the input for the
 	 * iteration head. A common usage pattern for streaming iterations is to use
 	 * output splitting to send a part of the closing data stream to the head.
-	 * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
-	 * more information.
+	 * Refer to {@link #split(OutputSelector)} for more information.
 	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head.
@@ -408,8 +427,7 @@ public class DataStream<OUT> {
 	 * the data stream that will be fed back and used as the input for the
 	 * iteration head. A common usage pattern for streaming iterations is to use
 	 * output splitting to send a part of the closing data stream to the head.
-	 * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
-	 * more information.
+	 * Refer to {@link #split(OutputSelector)} for more information.
 	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head.
@@ -1176,8 +1194,8 @@ public class DataStream<OUT> {
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
getType(),
 				sinkInvokable);
 
-		streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null,
-				"sink", degreeOfParallelism);
+		streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, "sink",
+				degreeOfParallelism);
 
 		this.connectGraph(this.copy(), returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index dbfbc48..dcfd6fe 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -22,7 +22,6 @@ import java.util.Map.Entry;
 
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
@@ -101,28 +100,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	/**
-	 * Operator used for directing tuples to specific named outputs using an
-	 * {@link OutputSelector}. Calling this method on an operator creates a new
-	 * {@link SplitDataStream}.
-	 * 
-	 * @param outputSelector
-	 *            The user defined {@link OutputSelector} for directing the
-	 *            tuples.
-	 * @return The {@link SplitDataStream}
-	 */
-	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
-		if (!isSplit) {
-			this.isSplit = true;
-			streamGraph.setOutputSelector(id, clean(outputSelector));
-
-			return new SplitDataStream<OUT>(this);
-		} else {
-			throw new RuntimeException("Currently operators can only be split once");
-		}
-
-	}
-
-	/**
 	 * This is a beta feature </br></br> Register an operator state for this
 	 * operator by the given name. This name can be used to retrieve the state
 	 * during runtime using {@link StreamingRuntimeContext#getState(String)}. To

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 2b5b7c5..97458a8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -56,7 +56,10 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
 		}
 
 		DataStream<OUT> returnStream = copy();
-		returnStream.userDefinedNames = Arrays.asList(outputNames);
+
+		for (DataStream<OUT> ds : returnStream.mergedStreams) {
+			ds.userDefinedNames = Arrays.asList(outputNames);
+		}
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 0d60939..1a12cb2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -108,7 +108,7 @@ public class OutputHandler<OUT> {
 		// We create a wrapper that will encapsulate the chained operators and
 		// network outputs
 		CollectorWrapper<OUT> wrapper = isDirectEmit ? new DirectedCollectorWrapper(
-				chainedTaskConfig.getOutputSelector(cl)) : new CollectorWrapper<OUT>();
+				chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>();
 
 		// Create collectors for the network outputs
 		for (String output : chainedTaskConfig.getOutputs(cl)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
new file mode 100644
index 0000000..2486715
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class OutputSplitterTest {
+
+	private static final long MEMORYSIZE = 32;
+
+	private static ArrayList<Integer> splitterResult1 = new ArrayList<Integer>();
+	private static ArrayList<Integer> splitterResult2 = new ArrayList<Integer>();
+
+
+	private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testOnMergedDataStream() throws Exception {
+		splitterResult1.clear();
+		splitterResult2.clear();
+
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
+		env.setBufferTimeout(1);
+
+		DataStream<Integer> d1 = env.fromElements(0,2,4,6,8);
+		DataStream<Integer> d2 = env.fromElements(1,3,5,7,9);
+
+		d1 = d1.merge(d2);
+
+		d1.split(new OutputSelector<Integer>() {
+			private static final long serialVersionUID = 8354166915727490130L;
+
+			@Override
+			public Iterable<String> select(Integer value) {
+				List<String> s = new ArrayList<String>();
+				if (value > 4) {
+					s.add(">");
+				} else {
+					s.add("<");
+				}
+				return s;
+			}
+		}).select(">").addSink(new SinkFunction<Integer>() {
+
+			private static final long serialVersionUID = 5827187510526388104L;
+
+			@Override
+			public void invoke(Integer value) {
+				splitterResult1.add(value);
+			}
+		});
+
+		d1.split(new OutputSelector<Integer>() {
+			private static final long serialVersionUID = -6822487543355994807L;
+
+			@Override
+			public Iterable<String> select(Integer value) {
+				List<String> s = new ArrayList<String>();
+				if (value % 3 == 0) {
+					s.add("yes");
+				} else {
+					s.add("no");
+				}
+				return s;
+			}
+		}).select("yes").addSink(new SinkFunction<Integer>() {
+			private static final long serialVersionUID = -2674335071267854599L;
+
+			@Override
+			public void invoke(Integer value) {
+				splitterResult2.add(value);
+			}
+		});
+		env.execute();
+
+		Collections.sort(splitterResult1);
+		Collections.sort(splitterResult2);
+
+		expectedSplitterResult.clear();
+		expectedSplitterResult.addAll(Arrays.asList(5,6,7,8,9));
+		assertEquals(expectedSplitterResult, splitterResult1);
+
+		expectedSplitterResult.clear();
+		expectedSplitterResult.addAll(Arrays.asList(0,3,6,9));
+		assertEquals(expectedSplitterResult, splitterResult2);
+	}
+
+	@Test
+	public void testOnSingleDataStream() throws Exception {
+		splitterResult1.clear();
+		splitterResult2.clear();
+
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
+		env.setBufferTimeout(1);
+
+		DataStream<Integer> ds = env.fromElements(0,1,2,3,4,5,6,7,8,9);
+
+		ds.split(new OutputSelector<Integer>() {
+			private static final long serialVersionUID = 2524335410904414121L;
+
+			@Override
+			public Iterable<String> select(Integer value) {
+				List<String> s = new ArrayList<String>();
+				if (value % 2 == 0) {
+					s.add("even");
+				} else {
+					s.add("odd");
+				}
+				return s;
+			}
+		}).select("even").addSink(new SinkFunction<Integer>() {
+
+			private static final long serialVersionUID = -2995092337537209535L;
+
+			@Override
+			public void invoke(Integer value) {
+				splitterResult1.add(value);
+			}
+		});
+
+		ds.split(new OutputSelector<Integer>() {
+
+			private static final long serialVersionUID = -511693919586034092L;
+
+			@Override
+			public Iterable<String> select(Integer value) {
+				List<String> s = new ArrayList<String>();
+				if (value % 4 == 0) {
+					s.add("yes");
+				} else {
+					s.add("no");
+				}
+				return s;
+			}
+		}).select("yes").addSink(new SinkFunction<Integer>() {
+
+			private static final long serialVersionUID = -1749077049727705424L;
+
+			@Override
+			public void invoke(Integer value) {
+				splitterResult2.add(value);
+			}
+		});
+		env.execute();
+
+		Collections.sort(splitterResult1);
+		Collections.sort(splitterResult2);
+
+		expectedSplitterResult.clear();
+		expectedSplitterResult.addAll(Arrays.asList(0,2,4,6,8));
+		assertEquals(expectedSplitterResult, splitterResult1);
+
+		expectedSplitterResult.clear();
+		expectedSplitterResult.addAll(Arrays.asList(0,4,8));
+		assertEquals(expectedSplitterResult, splitterResult2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 698b193..177a9ee 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -468,12 +468,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * OutputSelector. Calling this method on an operator creates a new
    * SplitDataStream.
    */
-  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream match {
-    case op: SingleOutputStreamOperator[_, _] => op.split(selector)
-    case _ =>
-      throw new UnsupportedOperationException("Operator " + javaStream.toString + " can not
be " +
-        "split.")
-  }
+  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector)
 
   /**
    * Creates a new SplitDataStream that contains only the elements satisfying the


Mime
View raw message