flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-6552] Allow differing types for side outputs
Date Mon, 15 May 2017 20:34:05 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 89e90f31e -> 446d651c1


[FLINK-6552] Allow differing types for side outputs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/446d651c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/446d651c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/446d651c

Branch: refs/heads/release-1.3
Commit: 446d651c11a6dd0f78c3b39870d192a09acbe38f
Parents: 89e90f3
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri May 12 14:40:44 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon May 15 12:54:00 2017 -0400

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamGraph.java  |  7 +--
 .../streaming/runtime/SideOutputITCase.java     | 55 ++++++++++++++++++--
 .../runtime/util/TestListResultSink.java        |  6 ++-
 3 files changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/446d651c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 5dd651c..2784517 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -318,9 +318,10 @@ public class StreamGraph extends StreamingPlan {
 				continue;
 			}
 
-			if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
-				throw new IllegalArgumentException("Trying to add a side input for the same id " +
-						"with a different type. This is not allowed.");
+			if (tag.f1.getId().equals(outputTag.getId()) &&
+					!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
+				throw new IllegalArgumentException("Trying to add a side output for the same" +
+						"side-output id with a different type. This is not allowed.");
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/446d651c/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 27124cc..765eae5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -160,14 +160,25 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase
implemen
 		env.execute();
 
 		assertEquals(
-				Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5",
"WM:0", "WM:2", "WM:" + Long.MAX_VALUE),
+				Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5",
+						"WM:0", "WM:0", "WM:0",
+						"WM:2", "WM:2", "WM:2" ,
+						"WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
 				sideOutputResultSink1.getSortedResult());
 
 		assertEquals(
-				Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5",
"WM:0", "WM:2", "WM:" + Long.MAX_VALUE),
+				Arrays.asList("E:sideout-1", "E:sideout-2", "E:sideout-3", "E:sideout-4", "E:sideout-5",
+						"WM:0", "WM:0", "WM:0",
+						"WM:2", "WM:2", "WM:2" ,
+						"WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
 				sideOutputResultSink1.getSortedResult());
 
-		assertEquals(Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5", "WM:0", "WM:2", "WM:" + Long.MAX_VALUE),
resultSink.getSortedResult());
+		assertEquals(
+				Arrays.asList("E:1", "E:2", "E:3", "E:4", "E:5",
+						"WM:0", "WM:0", "WM:0",
+						"WM:2", "WM:2", "WM:2" ,
+						"WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE, "WM:" + Long.MAX_VALUE),
+				resultSink.getSortedResult());
 	}
 
 	@Test
@@ -242,6 +253,44 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase
implemen
 	}
 
 	@Test
+	public void testDifferentSideOutputTypes() throws Exception {
+		final OutputTag<String> sideOutputTag1 = new OutputTag<String>("string"){};
+		final OutputTag<Integer> sideOutputTag2 = new OutputTag<Integer>("int"){};
+
+		TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+		TestListResultSink<Integer> sideOutputResultSink2 = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		env.setParallelism(3);
+
+		DataStream<Integer> dataStream = env.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = dataStream
+				.process(new ProcessFunction<Integer, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void processElement(
+							Integer value, Context ctx, Collector<Integer> out) throws Exception {
+						out.collect(value);
+						ctx.output(sideOutputTag1, "sideout-" + String.valueOf(value));
+						ctx.output(sideOutputTag2, 13);
+					}
+				});
+
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+		passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+		passThroughtStream.addSink(resultSink);
+		env.execute();
+
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"),
sideOutputResultSink1.getSortedResult());
+		assertEquals(Arrays.asList(13, 13, 13, 13, 13), sideOutputResultSink2.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	@Test
 	public void testSideOutputNameClash() throws Exception {
 		final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side"){};
 		final OutputTag<Integer> sideOutputTag2 = new OutputTag<Integer>("side"){};

http://git-wip-us.apache.org/repos/asf/flink/blob/446d651c/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
index 321d4c5..3fabb4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.test.streaming.runtime.util;
 
+import java.util.Collections;
+import java.util.Comparator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
@@ -66,8 +68,8 @@ public class TestListResultSink<T> extends RichSinkFunction<T>
{
 
 	public List<T> getSortedResult() {
 		synchronized (resultList()) {
-			TreeSet<T> treeSet = new TreeSet<T>(resultList());
-			ArrayList<T> sortedList = new ArrayList<T>(treeSet);
+			ArrayList<T> sortedList = new ArrayList<T>(resultList());
+			Collections.sort((List) sortedList);
 			return sortedList;
 		}
 	}


Mime
View raw message