flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [01/13] flink git commit: [FLINK-2550] Rename SplitDataStream to SplitStream
Date Mon, 05 Oct 2015 14:42:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master 68c1afc20 -> 9513f0e33


[FLINK-2550] Rename SplitDataStream to SplitStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9513f0e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9513f0e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9513f0e3

Branch: refs/heads/master
Commit: 9513f0e33f7aba46ebcc322d51ef12f0302ec2c2
Parents: 7b6e762
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Oct 5 14:20:33 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 .../api/FlinkTopologyBuilder.java               |  7 ++-
 .../util/SplitStreamMapper.java                 |  4 +-
 .../split/SpoutSplitExample.java                |  4 +-
 .../api/collector/selector/OutputSelector.java  |  6 +-
 .../streaming/api/datastream/DataStream.java    |  8 +--
 .../api/datastream/SplitDataStream.java         | 62 --------------------
 .../streaming/api/datastream/SplitStream.java   | 62 ++++++++++++++++++++
 .../flink/streaming/api/DataStreamTest.java     |  4 +-
 .../apache/flink/streaming/api/IterateTest.java |  6 +-
 .../streaming/api/StreamingOperatorsITCase.java |  4 +-
 .../api/collector/DirectedOutputTest.java       |  4 +-
 .../api/complex/ComplexIntegrationTest.java     |  4 +-
 .../examples/iteration/IterateExample.java      |  4 +-
 .../flink/streaming/api/scala/DataStream.scala  |  8 +--
 .../streaming/api/scala/SplitDataStream.scala   | 37 ------------
 .../flink/streaming/api/scala/SplitStream.scala | 37 ++++++++++++
 .../flink/streaming/api/scala/package.scala     |  6 +-
 .../StreamingScalaAPICompletenessTest.scala     |  6 +-
 18 files changed, 137 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index e2d819c..e4f6c94 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -1,4 +1,5 @@
 /*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -44,7 +45,7 @@ import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -112,7 +113,7 @@ public class FlinkTopologyBuilder {
 			} else {
 				source = env.addSource(spoutWrapper, spoutId,
 						TypeExtractor.getForClass(SplitStreamType.class));
-				SplitDataStream splitSource = source.split(new FlinkStormStreamSelector());
+				SplitStream splitSource = source.split(new FlinkStormStreamSelector());
 
 				for (String streamId : sourceStreams.keySet()) {
 					outputStreams.put(streamId, splitSource.select(streamId));
@@ -246,7 +247,7 @@ public class FlinkTopologyBuilder {
 										new StormBoltWrapper(userBolt, this.outputStreams.get(
 												producerId).get(inputStreamId)));
 
-								SplitDataStream splitStreams = outputStream
+								SplitStream splitStreams = outputStream
 										.split(new FlinkStormStreamSelector());
 
 								HashMap<String, DataStream> op = new HashMap<String, DataStream>();

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
index afcdcae..9cb44ec 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
@@ -18,13 +18,13 @@ package org.apache.flink.stormcompatibility.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 
 /**
  * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record
of type {@code T}. Can be used to get
  * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams
got separated using
  * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)
.split(...)} and
- * {@link SplitDataStream#select(String...) .select(...)}).
+ * {@link SplitStream#select(String...) .select(...)}).
  * 
  * @param <T>
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
index 4116f3c..18251d4 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
@@ -28,7 +28,7 @@ import org.apache.flink.stormcompatibility.util.SplitStreamType;
 import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
 import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /**
@@ -60,7 +60,7 @@ public class SpoutSplitExample {
 				new StormSpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
 						rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));
 
-		SplitDataStream<SplitStreamType<Integer>> splitStream = numbers
+		SplitStream<SplitStreamType<Integer>> splitStream = numbers
 				.split(new FlinkStormStreamSelector<Integer>());
 
 		DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
index b886fa6..9c6eede 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@@ -20,12 +20,12 @@ package org.apache.flink.streaming.api.collector.selector;
 import java.io.Serializable;
 
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 
 /**
- * Interface for defining an OutputSelector for a {@link SplitDataStream} using
+ * Interface for defining an OutputSelector for a {@link SplitStream} using
  * the {@link SingleOutputStreamOperator#split} call. Every output object of a
- * {@link SplitDataStream} will run through this operator to select outputs.
+ * {@link SplitStream} will run through this operator to select outputs.
  * 
  * @param <OUT>
  *            Type parameter of the split values.

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 003ef36..8de1a0d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -204,16 +204,16 @@ public class DataStream<T> {
 	/**
 	 * Operator used for directing tuples to specific named outputs using an
 	 * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
-	 * Calling this method on an operator creates a new {@link SplitDataStream}.
+	 * Calling this method on an operator creates a new {@link SplitStream}.
 	 * 
 	 * @param outputSelector
 	 *            The user defined
 	 *            {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
 	 *            for directing the tuples.
-	 * @return The {@link SplitDataStream}
+	 * @return The {@link SplitStream}
 	 */
-	public SplitDataStream<T> split(OutputSelector<T> outputSelector) {
-		return new SplitDataStream<T>(this, clean(outputSelector));
+	public SplitStream<T> split(OutputSelector<T> outputSelector) {
+		return new SplitStream<T>(this, clean(outputSelector));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
deleted file mode 100644
index bc9ecfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
-
-/**
- * The SplitDataStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply transformation on the whole output simply
- * call the transformation on the SplitDataStream
- *
- * @param <OUT> The type of the elements in the Stream
- */
-public class SplitDataStream<OUT> extends DataStream<OUT> {
-
-	protected SplitDataStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector)
{
-		super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(),
outputSelector));
-	}
-
-	/**
-	 * Sets the output names for which the next operator will receive values.
-	 * 
-	 * @param outputNames
-	 *            The output names for which the operator will receive the
-	 *            input.
-	 * @return Returns the selected DataStream
-	 */
-	public DataStream<OUT> select(String... outputNames) {
-		return selectOutput(outputNames);
-	}
-
-	private DataStream<OUT> selectOutput(String[] outputNames) {
-		for (String outName : outputNames) {
-			if (outName == null) {
-				throw new RuntimeException("Selected names must not be null");
-			}
-		}
-
-		SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(),
Lists.newArrayList(outputNames));
-		return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
new file mode 100644
index 0000000..11ee7f2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.transformations.SelectTransformation;
+import org.apache.flink.streaming.api.transformations.SplitTransformation;
+
+/**
+ * The SplitStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function. To apply transformation on the whole output simply
+ * call the transformation on the SplitStream
+ *
+ * @param <OUT> The type of the elements in the Stream
+ */
+public class SplitStream<OUT> extends DataStream<OUT> {
+
+	protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector)
{
+		super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(),
outputSelector));
+	}
+
+	/**
+	 * Sets the output names for which the next operator will receive values.
+	 * 
+	 * @param outputNames
+	 *            The output names for which the operator will receive the
+	 *            input.
+	 * @return Returns the selected DataStream
+	 */
+	public DataStream<OUT> select(String... outputNames) {
+		return selectOutput(outputNames);
+	}
+
+	private DataStream<OUT> selectOutput(String[] outputNames) {
+		for (String outName : outputNames) {
+			if (outName == null) {
+				throw new RuntimeException("Selected names must not be null");
+			}
+		}
+
+		SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(),
Lists.newArrayList(outputNames));
+		return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 55bf889..0b8482d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
@@ -457,7 +457,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase
{
 			}
 		};
 
-		SplitDataStream<Integer> split = unionFilter.split(outputSelector);
+		SplitStream<Integer> split = unionFilter.split(outputSelector);
 		split.select("dummy").addSink(new NoOpSink<Integer>());
 		List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors();
 		assertEquals(1, outputSelectors.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 7bdebf8..bd97e84 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
@@ -212,7 +212,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM
/ 2).addSink(new ReceiveCheckNoOpSink<Integer>());
 		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
 
-		SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+		SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
 				.map(NoOpIntMap).name("EvenOddSourceMap")
 				.split(new EvenOddOutputSelector());
 
@@ -295,7 +295,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 				.addSink(new ReceiveCheckNoOpSink<Integer>());
 		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
 
-		SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+		SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
 				.map(NoOpIntMap)
 				.name("split")
 				.split(new EvenOddOutputSelector());

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
index 6401546..42febea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
@@ -77,7 +77,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements,
numKeys));
 
-		SplitDataStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
+		SplitStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
 			.keyBy(0)
 			.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index b7df2ec..d2e24c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -27,7 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
@@ -102,7 +102,7 @@ public class DirectedOutputTest extends StreamingMultipleProgramsTestBase
{
 		TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
 		TestListResultSink<Long> allSink = new TestListResultSink<Long>();
 
-		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
+		SplitStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
 		source.select(EVEN).addSink(evenSink);
 		source.select(ODD, TEN).addSink(oddAndTenSink);
 		source.select(EVEN, ODD).addSink(evenAndOddSink);

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index d35c9bd..5e46508 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -138,7 +138,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase
{
 			}
 		}).iterate(5000);
 
-		SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new
IncrementMap()).split(new
+		SplitStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new
 				MyOutputSelector());
 		it.closeWith(step.select("iterate"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index af19af7..2cf66b9 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
@@ -74,7 +74,7 @@ public class IterateExample {
 
 		// apply the step function to get the next Fibonacci number
 		// increment the counter and split the output with the output selector
-		SplitDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step =
it.map(new Step())
+		SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new
Step())
 				.split(new MySelector());
 
 		// close the iteration by selecting the tuples that were directed to the

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 6ad7629..0cf1df8 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -729,15 +729,15 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * Operator used for directing tuples to specific named outputs using an
    * OutputSelector. Calling this method on an operator creates a new
-   * SplitDataStream.
+   * [[SplitStream]].
    */
-  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector)
+  def split(selector: OutputSelector[T]): SplitStream[T] = javaStream.split(selector)
 
   /**
-   * Creates a new SplitDataStream that contains only the elements satisfying the
+   * Creates a new [[SplitStream]] that contains only the elements satisfying the
    *  given output selector predicate.
    */
-  def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = {
+  def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
     if (fun == null) {
       throw new NullPointerException("OutputSelector must not be null.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
deleted file mode 100644
index 105d2c1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream
}
-
-/**
- * The SplitDataStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply a transformation on the whole output simply call
- * the appropriate method on this stream.
- *
- */
-class SplitDataStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){
-
-  /**
-   *  Sets the output names for which the next operator will receive values.
-   */
-  def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
new file mode 100644
index 0000000..deea6f0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
+
+/**
+ * The SplitStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function. To apply a transformation on the whole output simply call
+ * the appropriate method on this stream.
+ *
+ */
+class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){
+
+  /**
+   *  Sets the output names for which the next operator will receive values.
+   */
+  def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 625678a..d65ea41 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreato
 import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
-import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream
}
+import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
 import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams
}
 import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
 import language.implicitConversions
@@ -44,8 +44,8 @@ package object scala {
   implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R]
=
     new WindowedDataStream[R](javaWStream)
 
-  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R]
=
-    new SplitDataStream[R](javaStream)
+  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitStream[R]
=
+    new SplitStream[R](javaStream)
 
   implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1,
IN2]):
   ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)

http://git-wip-us.apache.org/repos/asf/flink/blob/9513f0e3/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 6ecdb85..d1fd233 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -118,9 +118,9 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase
{
       classOf[ConnectedStreams[_,_]])
 
     checkMethods(
-      "SplitDataStream", "SplitDataStream",
-      classOf[org.apache.flink.streaming.api.datastream.SplitDataStream[_]],
-      classOf[SplitDataStream[_]])
+      "SplitStream", "SplitStream",
+      classOf[org.apache.flink.streaming.api.datastream.SplitStream[_]],
+      classOf[SplitStream[_]])
 
     checkMethods(
       "WindowedStream", "WindowedStream",


Mime
View raw message