Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 396F8189A7 for ; Mon, 29 Feb 2016 20:38:07 +0000 (UTC) Received: (qmail 55231 invoked by uid 500); 29 Feb 2016 20:38:07 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 55180 invoked by uid 500); 29 Feb 2016 20:38:07 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 55171 invoked by uid 99); 29 Feb 2016 20:38:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Feb 2016 20:38:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E28DFE7889; Mon, 29 Feb 2016 20:38:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Mon, 29 Feb 2016 20:38:06 -0000 Message-Id: <5d11e57ad2d1477289774270394587b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator Repository: flink Updated Branches: refs/heads/release-1.0 74c62b0b8 -> 30486905b http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 4c1c265..fb7ec9f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -735,7 +735,7 @@ public abstract class StreamExecutionEnvironment { SourceFunction function; try { - function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); + function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); @@ -789,7 +789,7 @@ public abstract class StreamExecutionEnvironment { public DataStreamSource fromCollection(Iterator data, TypeInformation typeInfo) { Preconditions.checkNotNull(data, "The iterator must not be null"); - SourceFunction function = new FromIteratorFunction(data); + SourceFunction function = new FromIteratorFunction<>(data); return addSource(function, "Collection Source", typeInfo); } @@ -838,7 +838,7 @@ public abstract class StreamExecutionEnvironment { // private helper for passing different names private DataStreamSource fromParallelCollection(SplittableIterator iterator, TypeInformation typeInfo, String operatorName) { - return addSource(new FromSplittableIteratorFunction(iterator), operatorName).returns(typeInfo); + return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo); } /** @@ -1033,8 +1033,8 @@ public abstract class StreamExecutionEnvironment { // private helper for passing different names private DataStreamSource createInput(InputFormat inputFormat, TypeInformation typeInfo, String sourceName) { - FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); - return addSource(function, sourceName).returns(typeInfo); + FileSourceFunction function = new FileSourceFunction<>(inputFormat, typeInfo); + return addSource(function, sourceName, typeInfo); } /** @@ -1136,7 +1136,7 @@ public abstract class StreamExecutionEnvironment { sourceOperator = new StreamSource<>(function); } - return new DataStreamSource(this, typeInfo, sourceOperator, isParallel, sourceName); + return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 7a4d6f8..cf48160 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -435,7 +435,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { DataStreamSource> src = env.fromElements(new Tuple2<>(0L, 0L)); env.setParallelism(10); - SingleOutputStreamOperator map = src.map(new MapFunction, Long>() { + SingleOutputStreamOperator map = src.map(new MapFunction, Long>() { @Override public Long map(Tuple2 value) throws Exception { return null; @@ -759,7 +759,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { @SuppressWarnings("rawtypes,unchecked") private static Integer createDownStreamId(ConnectedStreams dataStream) { - SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction, Tuple2, Object>() { + SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction, Tuple2, Object>() { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java index bd97e84..cd73b41 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -68,7 +68,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase { DataStream source = env.fromElements(1, 10); IterativeStream iter1 = source.iterate(); - SingleOutputStreamOperator map1 = iter1.map(NoOpIntMap); + SingleOutputStreamOperator map1 = iter1.map(NoOpIntMap); iter1.closeWith(map1).print(); } @@ -289,8 +289,9 @@ public class IterateTest extends StreamingMultipleProgramsTestBase { IterativeStream iter1 = source1.union(source2).iterate(); DataStream head1 = iter1.map(NoOpIntMap).name("map1"); - DataStream head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).rebalance().name( - "shuffle"); + DataStream head2 = iter1.map(NoOpIntMap) + .setParallelism(DEFAULT_PARALLELISM / 2) + .name("shuffle").rebalance(); DataStreamSink head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2) .addSink(new ReceiveCheckNoOpSink()); DataStreamSink head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink()); @@ -302,7 +303,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase { iter1.closeWith( source3.select("even").union( - head1.map(NoOpIntMap).broadcast().name("bc"), + head1.map(NoOpIntMap).name("bc").broadcast(), head2.map(NoOpIntMap).shuffle())); StreamGraph graph = env.getStreamGraph(); http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index 4c0f59f..ddda82d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -80,7 +80,7 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase { assertEquals(BasicTypeInfo.LONG_TYPE_INFO, source.map(new TestMap()).returns(Long.class).getType()); - SingleOutputStreamOperator map = source.map(new MapFunction() { + SingleOutputStreamOperator map = source.map(new MapFunction() { @Override public String map(Long value) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index d1f92c6..46a985c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -110,7 +110,7 @@ public class StreamGraphGeneratorTest { .shuffle(); - SingleOutputStreamOperator unionedMap = map1.union(map2).union(map3) + SingleOutputStreamOperator unionedMap = map1.union(map2).union(map3) .map(new NoOpIntMap()); unionedMap.addSink(new NoOpSink()); @@ -163,7 +163,7 @@ public class StreamGraphGeneratorTest { EvenOddOutputSelector selector = new EvenOddOutputSelector(); - SingleOutputStreamOperator unionedMap = map1.union(map2).union(map3) + SingleOutputStreamOperator unionedMap = map1.union(map2).union(map3) .broadcast() .split(selector) .select("foo") @@ -240,6 +240,7 @@ public class StreamGraphGeneratorTest { private static class OutputTypeConfigurableOperationWithTwoInputs extends AbstractStreamOperator implements TwoInputStreamOperator, OutputTypeConfigurable { + private static final long serialVersionUID = 1L; TypeInformation tpeInformation; @@ -253,12 +254,12 @@ public class StreamGraphGeneratorTest { } @Override - public void processElement1(StreamRecord element) throws Exception { + public void processElement1(StreamRecord element) throws Exception { output.collect(element); } @Override - public void processElement2(StreamRecord element) throws Exception { + public void processElement2(StreamRecord element) throws Exception { output.collect(element); } @@ -275,6 +276,7 @@ public class StreamGraphGeneratorTest { private static class OutputTypeConfigurableOperationWithOneInput extends AbstractStreamOperator implements OneInputStreamOperator, OutputTypeConfigurable { + private static final long serialVersionUID = 1L; TypeInformation tpeInformation; http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 5f99ad8..fda1ebc 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWi import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, AssignerWithPeriodicWatermarks, AscendingTimestampExtractor, TimestampExtractor} import org.apache.flink.streaming.api.operators.OneInputStreamOperator -import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} @@ -125,7 +124,7 @@ class DataStream[T](stream: JavaStream[T]) { */ def setParallelism(parallelism: Int): DataStream[T] = { stream match { - case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism) + case ds: SingleOutputStreamOperator[T] => ds.setParallelism(parallelism) case _ => throw new UnsupportedOperationException( "Operator " + stream + " cannot set the parallelism.") @@ -140,7 +139,7 @@ class DataStream[T](stream: JavaStream[T]) { * @return Name of the stream. */ def name: String = stream match { - case stream : SingleOutputStreamOperator[T,_] => stream.getName + case stream : SingleOutputStreamOperator[T] => stream.getName case _ => throw new UnsupportedOperationException("Only supported for operators.") } @@ -165,7 +164,7 @@ class DataStream[T](stream: JavaStream[T]) { * @return The named operator */ def name(name: String) : DataStream[T] = stream match { - case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.name(name)) + case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.name(name)) case _ => throw new UnsupportedOperationException("Only supported for operators.") this } @@ -184,7 +183,7 @@ class DataStream[T](stream: JavaStream[T]) { */ @PublicEvolving def uid(uid: String) : DataStream[T] = javaStream match { - case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.uid(uid)) + case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.uid(uid)) case _ => throw new UnsupportedOperationException("Only supported for operators.") this } @@ -199,7 +198,7 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def disableChaining(): DataStream[T] = { stream match { - case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining() + case ds: SingleOutputStreamOperator[T] => ds.disableChaining() case _ => throw new UnsupportedOperationException("Only supported for operators.") } @@ -215,7 +214,7 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def startNewChain(): DataStream[T] = { stream match { - case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain() + case ds: SingleOutputStreamOperator[T] => ds.startNewChain() case _ => throw new UnsupportedOperationException("Only supported for operators.") } @@ -238,8 +237,7 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def slotSharingGroup(slotSharingGroup: String): DataStream[T] = { stream match { - case ds: SingleOutputStreamOperator[_, _] => ds.slotSharingGroup(slotSharingGroup) - case sink: DataStreamSink[_] => sink.slotSharingGroup(slotSharingGroup) + case ds: SingleOutputStreamOperator[T] => ds.slotSharingGroup(slotSharingGroup) case _ => throw new UnsupportedOperationException("Only supported for operators.") } @@ -256,7 +254,7 @@ class DataStream[T](stream: JavaStream[T]) { */ def setBufferTimeout(timeoutMillis: Long): DataStream[T] = { stream match { - case ds: SingleOutputStreamOperator[_, _] => ds.setBufferTimeout(timeoutMillis) + case ds: SingleOutputStreamOperator[T] => ds.setBufferTimeout(timeoutMillis) case _ => throw new UnsupportedOperationException("Only supported for operators.") } http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index e93b27b..cfa71ea 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -110,7 +110,7 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { checkMethods( "SingleOutputStreamOperator", "DataStream", - classOf[org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[_,_]], + classOf[org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[_]], classOf[DataStream[_]]) checkMethods(