flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject flink git commit: [FLINK-2419] Add test for sinks after keyBy and groupBy
Date Wed, 29 Jul 2015 18:48:07 GMT
Repository: flink
Updated Branches:
  refs/heads/master b211a6211 -> 73af89114


[FLINK-2419] Add test for sinks after keyBy and groupBy

Closes #947


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73af8911
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73af8911
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73af8911

Branch: refs/heads/master
Commit: 73af891141f453d607104e28d3a0e0a3438a409b
Parents: b211a62
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Jul 28 22:01:32 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Wed Jul 29 20:47:17 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/DataStreamTest.java     | 42 ++++++++++++++++++++
 1 file changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73af8911/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 764c6f2..324143f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -460,6 +460,48 @@ public class DataStreamTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void sinkKeyTest() {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		StreamGraph streamGraph = env.getStreamGraph();
+
+		DataStream<Long> sink = env.generateSequence(1, 100).print();
+		assertTrue(streamGraph.getStreamNode(sink.getId()).getStatePartitioner() == null);
+		assertTrue(streamGraph.getStreamNode(sink.getId()).getInEdges().get(0).getPartitioner()
instanceof RebalancePartitioner);
+
+		KeySelector<Long, Long> key1 = new KeySelector<Long, Long>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Long getKey(Long value) throws Exception {
+				return (long) 0;
+			}
+		};
+
+		DataStream<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
+
+		assertTrue(streamGraph.getStreamNode(sink2.getId()).getStatePartitioner() != null);
+		assertEquals(key1, streamGraph.getStreamNode(sink2.getId()).getStatePartitioner());
+		assertTrue(streamGraph.getStreamNode(sink2.getId()).getInEdges().get(0).getPartitioner()
instanceof FieldsPartitioner);
+
+		KeySelector<Long, Long> key2 = new KeySelector<Long, Long>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Long getKey(Long value) throws Exception {
+				return (long) 0;
+			}
+		};
+
+		DataStream<Long> sink3 = env.generateSequence(1, 100).keyBy(key2).print();
+
+		assertTrue(streamGraph.getStreamNode(sink3.getId()).getStatePartitioner() != null);
+		assertEquals(key2, streamGraph.getStreamNode(sink3.getId()).getStatePartitioner());
+		assertTrue(streamGraph.getStreamNode(sink3.getId()).getInEdges().get(0).getPartitioner()
instanceof FieldsPartitioner);
+	}
 
 	@Test
 	public void testChannelSelectors() {


Mime
View raw message