flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [5/9] flink git commit: [FLINK-6550] Reject null OutputTags in Context#collect(OutputTag<X>, X)
Date Tue, 18 Jul 2017 18:59:00 GMT
[FLINK-6550] Reject null OutputTags in Context#collect(OutputTag<X>, X)

This closes #4312.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daed1eec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daed1eec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daed1eec

Branch: refs/heads/master
Commit: daed1eecbab283803c48c13fb380d442f3cfd1a5
Parents: ec1044d
Author: zentol <chesnay@apache.org>
Authored: Mon Jun 19 15:57:14 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Tue Jul 18 17:03:12 2017 +0200

----------------------------------------------------------------------
 .../api/operators/KeyedProcessOperator.java     |  3 ++
 .../api/operators/ProcessOperator.java          |  3 ++
 .../api/operators/ProcessOperatorTest.java      | 31 ++++++++++++++++++++
 3 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/daed1eec/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index a46897c..5537b5e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -119,6 +119,9 @@ public class KeyedProcessOperator<K, IN, OUT>
 
 		@Override
 		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
 			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/daed1eec/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index f08c1ee..5c9e8fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -99,6 +99,9 @@ public class ProcessOperator<IN, OUT>
 
 		@Override
 		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
 			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/daed1eec/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index 35ab00c..5476123 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -27,7 +27,9 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -36,6 +38,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 public class ProcessOperatorTest extends TestLogger {
 
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	@Test
 	public void testTimestampAndWatermarkQuerying() throws Exception {
 
@@ -94,6 +99,32 @@ public class ProcessOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
+	@Test
+	public void testNullOutputTagRefusal() throws Exception {
+		ProcessOperator<Integer, String> operator = new ProcessOperator<>(new NullOutputTagEmittingProcessFunction());
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		try {
+			expectedException.expect(IllegalArgumentException.class);
+			testHarness.processElement(new StreamRecord<>(5));
+		} finally {
+			testHarness.close();
+		}
+	}
+
+	private static class NullOutputTagEmittingProcessFunction extends ProcessFunction<Integer,
String> {
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws
Exception {
+			ctx.output(null, value);
+		}
+	}
+
 	private static class QueryingProcessFunction extends ProcessFunction<Integer, String>
{
 
 		private static final long serialVersionUID = 1L;


Mime
View raw message