flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator
Date Mon, 29 Feb 2016 20:38:06 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 74c62b0b8 -> 30486905b


http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4c1c265..fb7ec9f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -735,7 +735,7 @@ public abstract class StreamExecutionEnvironment {
 
 		SourceFunction<OUT> function;
 		try {
-			function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()),
data);
+			function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
 		}
 		catch (IOException e) {
 			throw new RuntimeException(e.getMessage(), e);
@@ -789,7 +789,7 @@ public abstract class StreamExecutionEnvironment {
 	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data,
TypeInformation<OUT> typeInfo) {
 		Preconditions.checkNotNull(data, "The iterator must not be null");
 
-		SourceFunction<OUT> function = new FromIteratorFunction<OUT>(data);
+		SourceFunction<OUT> function = new FromIteratorFunction<>(data);
 		return addSource(function, "Collection Source", typeInfo);
 	}
 
@@ -838,7 +838,7 @@ public abstract class StreamExecutionEnvironment {
 	// private helper for passing different names
 	private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT>
iterator, TypeInformation<OUT>
 			typeInfo, String operatorName) {
-		return addSource(new FromSplittableIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+		return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo);
 	}
 
 	/**
@@ -1033,8 +1033,8 @@ public abstract class StreamExecutionEnvironment {
 	// private helper for passing different names
 	private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
 			TypeInformation<OUT> typeInfo, String sourceName) {
-		FileSourceFunction<OUT> function = new FileSourceFunction<OUT>(inputFormat,
typeInfo);
-		return addSource(function, sourceName).returns(typeInfo);
+		FileSourceFunction<OUT> function = new FileSourceFunction<>(inputFormat, typeInfo);
+		return addSource(function, sourceName, typeInfo);
 	}
 
 	/**
@@ -1136,7 +1136,7 @@ public abstract class StreamExecutionEnvironment {
 			sourceOperator = new StreamSource<>(function);
 		}
 
-		return new DataStreamSource<OUT>(this, typeInfo, sourceOperator, isParallel, sourceName);
+		return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 7a4d6f8..cf48160 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -435,7 +435,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase
{
 		DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<>(0L,
0L));
 		env.setParallelism(10);
 
-		SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long,
Long>, Long>() {
+		SingleOutputStreamOperator<Long> map = src.map(new MapFunction<Tuple2<Long,
Long>, Long>() {
 			@Override
 			public Long map(Tuple2<Long, Long> value) throws Exception {
 				return null;
@@ -759,7 +759,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase
{
 
 	@SuppressWarnings("rawtypes,unchecked")
 	private static Integer createDownStreamId(ConnectedStreams dataStream) {
-		SingleOutputStreamOperator<?, ?> coMap = dataStream.map(new CoMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>, Object>() {
+		SingleOutputStreamOperator<?> coMap = dataStream.map(new CoMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>, Object>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index bd97e84..cd73b41 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -68,7 +68,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Integer> source = env.fromElements(1, 10);
 
 		IterativeStream<Integer> iter1 = source.iterate();
-		SingleOutputStreamOperator<Integer, ?> map1 = iter1.map(NoOpIntMap);
+		SingleOutputStreamOperator<Integer> map1 = iter1.map(NoOpIntMap);
 		iter1.closeWith(map1).print();
 	}
 
@@ -289,8 +289,9 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		IterativeStream<Integer> iter1 = source1.union(source2).iterate();
 
 		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
-		DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM
/ 2).rebalance().name(
-				"shuffle");
+		DataStream<Integer> head2 = iter1.map(NoOpIntMap)
+				.setParallelism(DEFAULT_PARALLELISM / 2)
+				.name("shuffle").rebalance();
 		DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM
/ 2)
 				.addSink(new ReceiveCheckNoOpSink<Integer>());
 		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
@@ -302,7 +303,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 
 		iter1.closeWith(
 				source3.select("even").union(
-						head1.map(NoOpIntMap).broadcast().name("bc"),
+						head1.map(NoOpIntMap).name("bc").broadcast(),
 						head2.map(NoOpIntMap).shuffle()));
 
 		StreamGraph graph = env.getStreamGraph();

http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 4c0f59f..ddda82d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -80,7 +80,7 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase {
 		assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
 				source.map(new TestMap<Long, Long>()).returns(Long.class).getType());
 
-		SingleOutputStreamOperator<String, ?> map = source.map(new MapFunction<Long, String>()
{
+		SingleOutputStreamOperator<String> map = source.map(new MapFunction<Long, String>()
{
 
 			@Override
 			public String map(Long value) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index d1f92c6..46a985c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -110,7 +110,7 @@ public class StreamGraphGeneratorTest {
 				.shuffle();
 
 
-		SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
+		SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3)
 				.map(new NoOpIntMap());
 
 		unionedMap.addSink(new NoOpSink<Integer>());
@@ -163,7 +163,7 @@ public class StreamGraphGeneratorTest {
 
 		EvenOddOutputSelector selector = new EvenOddOutputSelector();
 
-		SingleOutputStreamOperator<Integer, ?> unionedMap = map1.union(map2).union(map3)
+		SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3)
 				.broadcast()
 				.split(selector)
 				.select("foo")
@@ -240,6 +240,7 @@ public class StreamGraphGeneratorTest {
 	private static class OutputTypeConfigurableOperationWithTwoInputs
 			extends AbstractStreamOperator<Integer>
 			implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer>
{
+		private static final long serialVersionUID = 1L;
 
 		TypeInformation<Integer> tpeInformation;
 
@@ -253,12 +254,12 @@ public class StreamGraphGeneratorTest {
 		}
 
 		@Override
-		public void processElement1(StreamRecord element) throws Exception {
+		public void processElement1(StreamRecord<Integer> element) throws Exception {
 			output.collect(element);
 		}
 
 		@Override
-		public void processElement2(StreamRecord element) throws Exception {
+		public void processElement2(StreamRecord<Integer> element) throws Exception {
 			output.collect(element);
 		}
 
@@ -275,6 +276,7 @@ public class StreamGraphGeneratorTest {
 	private static class OutputTypeConfigurableOperationWithOneInput
 			extends AbstractStreamOperator<Integer>
 			implements OneInputStreamOperator<Integer, Integer>, OutputTypeConfigurable<Integer>
{
+		private static final long serialVersionUID = 1L;
 
 		TypeInformation<Integer> tpeInformation;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 5f99ad8..fda1ebc 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.datastream.{AllWindowedStream =>
JavaAllWi
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, AssignerWithPeriodicWatermarks,
AscendingTimestampExtractor, TimestampExtractor}
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
@@ -125,7 +124,7 @@ class DataStream[T](stream: JavaStream[T]) {
    */
   def setParallelism(parallelism: Int): DataStream[T] = {
     stream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism)
+      case ds: SingleOutputStreamOperator[T] => ds.setParallelism(parallelism)
       case _ =>
         throw new UnsupportedOperationException(
           "Operator " + stream + " cannot set the parallelism.")
@@ -140,7 +139,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * @return Name of the stream.
    */
   def name: String = stream match {
-    case stream : SingleOutputStreamOperator[T,_] => stream.getName
+    case stream : SingleOutputStreamOperator[T] => stream.getName
     case _ => throw new
         UnsupportedOperationException("Only supported for operators.")
   }
@@ -165,7 +164,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * @return The named operator
    */
   def name(name: String) : DataStream[T] = stream match {
-    case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.name(name))
+    case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.name(name))
     case _ => throw new UnsupportedOperationException("Only supported for operators.")
     this
   }
@@ -184,7 +183,7 @@ class DataStream[T](stream: JavaStream[T]) {
     */
   @PublicEvolving
   def uid(uid: String) : DataStream[T] = javaStream match {
-    case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.uid(uid))
+    case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.uid(uid))
     case _ => throw new UnsupportedOperationException("Only supported for operators.")
     this
   }
@@ -199,7 +198,7 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def disableChaining(): DataStream[T] = {
     stream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining()
+      case ds: SingleOutputStreamOperator[T] => ds.disableChaining()
       case _ =>
         throw new UnsupportedOperationException("Only supported for operators.")
     }
@@ -215,7 +214,7 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def startNewChain(): DataStream[T] = {
     stream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain()
+      case ds: SingleOutputStreamOperator[T] => ds.startNewChain()
       case _ =>
         throw new UnsupportedOperationException("Only supported for operators.")
     }
@@ -238,8 +237,7 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def slotSharingGroup(slotSharingGroup: String): DataStream[T] = {
     stream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.slotSharingGroup(slotSharingGroup)
-      case sink: DataStreamSink[_] => sink.slotSharingGroup(slotSharingGroup)
+      case ds: SingleOutputStreamOperator[T] => ds.slotSharingGroup(slotSharingGroup)
       case _ =>
         throw new UnsupportedOperationException("Only supported for operators.")
     }
@@ -256,7 +254,7 @@ class DataStream[T](stream: JavaStream[T]) {
    */
   def setBufferTimeout(timeoutMillis: Long): DataStream[T] = {
     stream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.setBufferTimeout(timeoutMillis)
+      case ds: SingleOutputStreamOperator[T] => ds.setBufferTimeout(timeoutMillis)
       case _ =>
         throw new UnsupportedOperationException("Only supported for operators.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index e93b27b..cfa71ea 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -110,7 +110,7 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase
{
 
     checkMethods(
       "SingleOutputStreamOperator", "DataStream",
-      classOf[org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[_,_]],
+      classOf[org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[_]],
       classOf[DataStream[_]])
 
     checkMethods(


Mime
View raw message