flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [27/37] flink git commit: [FLINK-6246] Fix generic type of OutputTag in operator Output
Date Thu, 06 Apr 2017 07:28:40 GMT
[FLINK-6246] Fix generic type of OutputTag in operator Output

This closes #3662.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bdbe607
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bdbe607
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bdbe607

Branch: refs/heads/table-retraction
Commit: 9bdbe6071f1946391598709bfa637fd76a8c7396
Parents: 48ce77c
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Apr 3 16:10:14 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Wed Apr 5 20:43:00 2017 +0200

----------------------------------------------------------------------
 .../api/collector/selector/DirectedOutput.java     |  2 +-
 .../api/operators/AbstractStreamOperator.java      |  2 +-
 .../flink/streaming/api/operators/Output.java      |  2 +-
 .../streaming/runtime/io/RecordWriterOutput.java   |  2 +-
 .../streaming/runtime/tasks/OperatorChain.java     | 17 ++++++-----------
 .../runtime/tasks/StreamIterationTail.java         |  2 +-
 .../util/AbstractStreamOperatorTestHarness.java    |  2 +-
 .../flink/streaming/util/CollectorOutput.java      |  2 +-
 .../apache/flink/streaming/util/MockOutput.java    |  2 +-
 9 files changed, 14 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
index dabe804..6339506 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -140,7 +140,7 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>>
{
 	}
 
 	@Override
-	public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 		throw new UnsupportedOperationException("Cannot use split/select with side outputs.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index cc81a0e..7569170 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -872,7 +872,7 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 
 		@Override
-		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 			numRecordsOut.inc();
 			output.collect(outputTag, record);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index eb10d8d..7035d28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -48,7 +48,7 @@ public interface Output<T> extends Collector<T> {
 	 *
 	 * @param record The record to collect.
 	 */
-	<X> void collect(OutputTag<?> outputTag, StreamRecord<X> record);
+	<X> void collect(OutputTag<X> outputTag, StreamRecord<X> record);
 
 	void emitLatencyMarker(LatencyMarker latencyMarker);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index d22c60d..365c78e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -85,7 +85,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>>
{
 	}
 
 	@Override
-	public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
 			// we are only responsible for emitting to the side-output specified by our
 			// OutputTag.

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 499d4a3..be4b456 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -423,7 +423,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 		}
 
 		@Override
-		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 			if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
 				// we are only responsible for emitting to the side-output specified by our
 				// OutputTag.
@@ -506,7 +506,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 		}
 
 		@Override
-		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 			if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
 				// we are only responsible for emitting to the side-output specified by our
 				// OutputTag.
@@ -579,7 +579,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 		}
 
 		@Override
-		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 			for (Output<StreamRecord<T>> output : outputs) {
 				output.collect(outputTag, record);
 			}
@@ -619,21 +619,16 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 		}
 
 		@Override
-		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 			for (int i = 0; i < outputs.length - 1; i++) {
 				Output<StreamRecord<T>> output = outputs[i];
 
-				// due to side outputs, StreamRecords of varying types can pass through the broadcasting
-				// collector so we need to cast
-				@SuppressWarnings({"unchecked", "rawtypes"})
-				StreamRecord<T> shallowCopy = (StreamRecord<T>) record.copy(record.getValue());
+				StreamRecord<X> shallowCopy = record.copy(record.getValue());
 				output.collect(outputTag, shallowCopy);
 			}
 
 			// don't copy for the last output
-			@SuppressWarnings({"unchecked", "rawtypes"})
-			StreamRecord<T> castRecord = (StreamRecord<T>) record;
-			outputs[outputs.length - 1].collect(outputTag, castRecord);
+			outputs[outputs.length - 1].collect(outputTag, record);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 35d14e7..e40f834 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -123,7 +123,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN,
IN> {
 		}
 
 		@Override
-		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 			throw new UnsupportedOperationException("Side outputs not used in iteration tail");
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 912d579..a2a4f79 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -656,7 +656,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		}
 
 		@Override
-		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 			sideOutputSerializer = TypeExtractor.getForObject(record.getValue()).createSerializer(executionConfig);
 
 			ConcurrentLinkedQueue<Object> sideOutputList = sideOutputLists.get(outputTag);

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
index 07b37c8..bd929da 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -54,7 +54,7 @@ public class CollectorOutput<T> implements Output<StreamRecord<T>>
{
 	}
 
 	@Override
-	public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 		throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index 867080c..8c3226b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -41,7 +41,7 @@ public class MockOutput<T> implements Output<StreamRecord<T>>
{
 	}
 
 	@Override
-	public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
{
+	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
{
 		throw new UnsupportedOperationException("Side output not supported for MockOutput");
 	}
 


Mime
View raw message