Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CBE26200B35 for ; Tue, 5 Jul 2016 16:38:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CA7DD160A2C; Tue, 5 Jul 2016 14:38:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8758C160A7D for ; Tue, 5 Jul 2016 16:38:40 +0200 (CEST) Received: (qmail 37528 invoked by uid 500); 5 Jul 2016 14:38:39 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 37084 invoked by uid 99); 5 Jul 2016 14:38:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 14:38:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C5446EC22D; Tue, 5 Jul 2016 14:38:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Date: Tue, 05 Jul 2016 14:38:48 -0000 Message-Id: <48fa1556552f49328439545d3badd013@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/15] flink git commit: [FLINK-3995] [build] flink-test-utils also contains the streaming test utilities. archived-at: Tue, 05 Jul 2016 14:38:42 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java new file mode 100644 index 0000000..da3de3d --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("serial") +public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { + + private static List testResults; + + @Test + public void testCoGroup() throws Exception { + + testResults = new ArrayList<>(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream> source1 = env.addSource(new SourceFunction>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + ctx.collect(Tuple2.of("a", 2)); + + ctx.collect(Tuple2.of("b", 3)); + ctx.collect(Tuple2.of("b", 4)); + ctx.collect(Tuple2.of("b", 5)); + + ctx.collect(Tuple2.of("a", 6)); + ctx.collect(Tuple2.of("a", 7)); + ctx.collect(Tuple2.of("a", 8)); + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + @Override + public void cancel() { + } + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); + + DataStream> source2 = env.addSource(new SourceFunction>() { + + @Override + public void run(SourceContext> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + + ctx.collect(Tuple2.of("b", 3)); + + ctx.collect(Tuple2.of("c", 6)); + ctx.collect(Tuple2.of("c", 7)); + ctx.collect(Tuple2.of("c", 8)); + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + @Override + public void cancel() { + } + }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor()); + + + source1.coGroup(source2) + .where(new Tuple2KeyExtractor()) + .equalTo(new Tuple2KeyExtractor()) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new CoGroupFunction, Tuple2, String>() { + @Override + public void coGroup(Iterable> first, + Iterable> second, + Collector out) throws Exception { + StringBuilder result = new StringBuilder(); + result.append("F:"); + for (Tuple2 t: first) { + result.append(t.toString()); + } + result.append(" S:"); + for (Tuple2 t: second) { + result.append(t.toString()); + } + out.collect(result.toString()); + } + }) + .addSink(new SinkFunction() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("CoGroup Test"); + + List expectedResult = Arrays.asList( + "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)", + "F:(b,3)(b,4)(b,5) S:(b,3)", + "F:(a,6)(a,7)(a,8) S:", + "F: S:(c,6)(c,7)(c,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + @Test + public void testJoin() throws Exception { + + testResults = new ArrayList<>(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream> source1 = env.addSource(new SourceFunction>() { + + @Override + public void run(SourceContext> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "x", 0)); + ctx.collect(Tuple3.of("a", "y", 1)); + ctx.collect(Tuple3.of("a", "z", 2)); + + ctx.collect(Tuple3.of("b", "u", 3)); + ctx.collect(Tuple3.of("b", "w", 5)); + + ctx.collect(Tuple3.of("a", "i", 6)); + ctx.collect(Tuple3.of("a", "j", 7)); + ctx.collect(Tuple3.of("a", "k", 8)); + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + @Override + public void cancel() {} + + }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()); + + DataStream> source2 = env.addSource(new SourceFunction>() { + + @Override + public void run(SourceContext> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "u", 0)); + ctx.collect(Tuple3.of("a", "w", 1)); + + ctx.collect(Tuple3.of("b", "i", 3)); + ctx.collect(Tuple3.of("b", "k", 5)); + + ctx.collect(Tuple3.of("a", "x", 6)); + ctx.collect(Tuple3.of("a", "z", 8)); + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + @Override + public void cancel() {} + + }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()); + + + source1.join(source2) + .where(new Tuple3KeyExtractor()) + .equalTo(new Tuple3KeyExtractor()) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new JoinFunction, Tuple3, String>() { + @Override + public String join(Tuple3 first, Tuple3 second) throws Exception { + return first + ":" + second; + } + }) + .addSink(new SinkFunction() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("Join Test"); + + List expectedResult = Arrays.asList( + "(a,x,0):(a,u,0)", + "(a,x,0):(a,w,1)", + "(a,y,1):(a,u,0)", + "(a,y,1):(a,w,1)", + "(a,z,2):(a,u,0)", + "(a,z,2):(a,w,1)", + "(b,u,3):(b,i,3)", + "(b,u,3):(b,k,5)", + "(b,w,5):(b,i,3)", + "(b,w,5):(b,k,5)", + "(a,i,6):(a,x,6)", + "(a,i,6):(a,z,8)", + "(a,j,7):(a,x,6)", + "(a,j,7):(a,z,8)", + "(a,k,8):(a,x,6)", + "(a,k,8):(a,z,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + @Test + public void testSelfJoin() throws Exception { + + testResults = new ArrayList<>(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream> source1 = env.addSource(new SourceFunction>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "x", 0)); + ctx.collect(Tuple3.of("a", "y", 1)); + ctx.collect(Tuple3.of("a", "z", 2)); + + ctx.collect(Tuple3.of("b", "u", 3)); + ctx.collect(Tuple3.of("b", "w", 5)); + + ctx.collect(Tuple3.of("a", "i", 6)); + ctx.collect(Tuple3.of("a", "j", 7)); + ctx.collect(Tuple3.of("a", "k", 8)); + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + @Override + public void cancel() { + } + }).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor()); + + source1.join(source1) + .where(new Tuple3KeyExtractor()) + .equalTo(new Tuple3KeyExtractor()) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new JoinFunction, Tuple3, String>() { + @Override + public String join(Tuple3 first, Tuple3 second) throws Exception { + return first + ":" + second; + } + }) + .addSink(new SinkFunction() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("Self-Join Test"); + + List expectedResult = Arrays.asList( + "(a,x,0):(a,x,0)", + "(a,x,0):(a,y,1)", + "(a,x,0):(a,z,2)", + "(a,y,1):(a,x,0)", + "(a,y,1):(a,y,1)", + "(a,y,1):(a,z,2)", + "(a,z,2):(a,x,0)", + "(a,z,2):(a,y,1)", + "(a,z,2):(a,z,2)", + "(b,u,3):(b,u,3)", + "(b,u,3):(b,w,5)", + "(b,w,5):(b,u,3)", + "(b,w,5):(b,w,5)", + "(a,i,6):(a,i,6)", + "(a,i,6):(a,j,7)", + "(a,i,6):(a,k,8)", + "(a,j,7):(a,i,6)", + "(a,j,7):(a,j,7)", + "(a,j,7):(a,k,8)", + "(a,k,8):(a,i,6)", + "(a,k,8):(a,j,7)", + "(a,k,8):(a,k,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks> { + + @Override + public long extractTimestamp(Tuple2 element, long previousTimestamp) { + return element.f1; + } + + @Override + public Watermark checkAndGetNextWatermark(Tuple2 element, long extractedTimestamp) { + return new Watermark(extractedTimestamp - 1); + } + } + + private static class Tuple3TimestampExtractor implements AssignerWithPunctuatedWatermarks> { + + @Override + public long extractTimestamp(Tuple3 element, long previousTimestamp) { + return element.f2; + } + + @Override + public Watermark checkAndGetNextWatermark(Tuple3 lastElement, long extractedTimestamp) { + return new Watermark(lastElement.f2 - 1); + } + } + + private static class Tuple2KeyExtractor implements KeySelector, String> { + + @Override + public String getKey(Tuple2 value) throws Exception { + return value.f0; + } + } + + private static class Tuple3KeyExtractor implements KeySelector, String> { + + @Override + public String getKey(Tuple3 value) throws Exception { + return value.f0; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java new file mode 100644 index 0000000..360ceb3 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@SuppressWarnings("serial") +public class CoStreamITCase extends StreamingMultipleProgramsTestBase { + + @Test + public void test() throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + TestListResultSink resultSink = new TestListResultSink(); + + DataStream src = env.fromElements(1, 3, 5); + + DataStream filter1 = src + .filter(new FilterFunction() { + @Override + public boolean filter(Integer value) throws Exception { + return true; + } + }) + + .keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }); + + DataStream> filter2 = src + .map(new MapFunction>() { + + @Override + public Tuple2 map(Integer value) throws Exception { + return new Tuple2<>(value, value + 1); + } + }) + .rebalance() + .filter(new FilterFunction>() { + + @Override + public boolean filter(Tuple2 value) throws Exception { + return true; + } + }) + .disableChaining() + .keyBy(new KeySelector, Integer>() { + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }); + + DataStream connected = filter1.connect(filter2) + .flatMap(new CoFlatMapFunction, String>() { + + @Override + public void flatMap1(Integer value, Collector out) throws Exception { + out.collect(value.toString()); + } + + @Override + public void flatMap2(Tuple2 value, Collector out) throws Exception { + out.collect(value.toString()); + } + }); + + connected.addSink(resultSink); + + + env.execute(); + + List expected = Arrays.asList("(1,2)", "(3,4)", "(5,6)", "1", "3", "5"); + List result = resultSink.getResult(); + Collections.sort(result); + assertEquals(expected, result); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java new file mode 100644 index 0000000..c345b37 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Integration test for streaming programs using POJOs and key selectors + * + * See FLINK-3697 + */ +public class DataStreamPojoITCase extends StreamingMultipleProgramsTestBase { + static List elements = new ArrayList<>(); + static { + elements.add(new Data(0,0,0)); + elements.add(new Data(0,0,0)); + elements.add(new Data(1,1,1)); + elements.add(new Data(1,1,1)); + elements.add(new Data(2,2,3)); + elements.add(new Data(2,2,3)); + } + + /** + * Test composite key on the Data POJO (with nested fields) + */ + @Test + public void testCompositeKeyOnNestedPojo() throws Exception { + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.getConfig().disableObjectReuse(); + see.setParallelism(3); + + DataStream dataStream = see.fromCollection(elements); + + DataStream summedStream = dataStream + .keyBy("aaa", "abc", "wxyz") + .sum("sum") + .keyBy("aaa", "abc", "wxyz") + .flatMap(new FlatMapFunction() { + Data[] first = new Data[3]; + @Override + public void flatMap(Data value, Collector out) throws Exception { + if(first[value.aaa] == null) { + first[value.aaa] = value; + if(value.sum != 1) { + throw new RuntimeException("Expected the sum to be one"); + } + } else { + if(value.sum != 2) { + throw new RuntimeException("Expected the sum to be two"); + } + if(first[value.aaa].aaa != value.aaa) { + throw new RuntimeException("aaa key wrong"); + } + if(first[value.aaa].abc != value.abc) { + throw new RuntimeException("abc key wrong"); + } + if(first[value.aaa].wxyz != value.wxyz) { + throw new RuntimeException("wxyz key wrong"); + } + } + } + }); + + summedStream.print(); + + see.execute(); + } + + /** + * Test composite & nested key on the Data POJO + */ + @Test + public void testNestedKeyOnNestedPojo() throws Exception { + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.getConfig().disableObjectReuse(); + see.setParallelism(4); + + DataStream dataStream = see.fromCollection(elements); + + DataStream summedStream = dataStream + .keyBy("aaa", "stats.count") + .sum("sum") + .keyBy("aaa", "stats.count") + .flatMap(new FlatMapFunction() { + Data[] first = new Data[3]; + @Override + public void flatMap(Data value, Collector out) throws Exception { + if(value.stats.count != 123) { + throw new RuntimeException("Wrong value for value.stats.count"); + } + if(first[value.aaa] == null) { + first[value.aaa] = value; + if(value.sum != 1) { + throw new RuntimeException("Expected the sum to be one"); + } + } else { + if(value.sum != 2) { + throw new RuntimeException("Expected the sum to be two"); + } + if(first[value.aaa].aaa != value.aaa) { + throw new RuntimeException("aaa key wrong"); + } + if(first[value.aaa].abc != value.abc) { + throw new RuntimeException("abc key wrong"); + } + if(first[value.aaa].wxyz != value.wxyz) { + throw new RuntimeException("wxyz key wrong"); + } + } + } + }); + + summedStream.print(); + + see.execute(); + } + + + /** + * As per FLINK-3702 Flink doesn't support nested pojo fields for sum() + */ + @Test(expected = IllegalArgumentException.class) + public void testFailOnNestedPojoFieldAccessor() throws Exception { + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = see.fromCollection(elements); + dataStream.keyBy("aaa", "stats.count").sum("stats.count"); + } + + public static class Data { + public int sum; // sum + public int aaa; // keyBy + public int abc; //keyBy + public long wxyz; // keyBy + public int t1; + public int t2; + public Policy policy; + public Stats stats; + + public Data() { + } + public Data(int aaa, int abc, int wxyz) { + this.sum = 1; + this.aaa = aaa; + this.abc = abc; + this.wxyz = wxyz; + this.stats = new Stats(); + this.stats.count = 123L; + } + + @Override + public String toString() { + return "Data{" + + "sum=" + sum + + ", aaa=" + aaa + + ", abc=" + abc + + ", wxyz=" + wxyz + + '}'; + } + } + public static class Policy { + public short a; + public short b; + public boolean c; + public boolean d; + + public Policy() {} + } + + public static class Stats { + public long count; + public float a; + public float b; + public float c; + public float d; + public float e; + + public Stats() {} + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java new file mode 100644 index 0000000..8b84112 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class DirectedOutputITCase extends StreamingMultipleProgramsTestBase { + + private static final String TEN = "ten"; + private static final String ODD = "odd"; + private static final String EVEN = "even"; + private static final String NON_SELECTED = "nonSelected"; + + static final class MyOutputSelector implements OutputSelector { + private static final long serialVersionUID = 1L; + + List outputs = new ArrayList(); + + @Override + public Iterable select(Long value) { + outputs.clear(); + if (value % 2 == 0) { + outputs.add(EVEN); + } else { + outputs.add(ODD); + } + + if (value == 10L) { + outputs.add(TEN); + } + + if (value == 11L) { + outputs.add(NON_SELECTED); + } + return outputs; + } + } + + @Test + public void outputSelectorTest() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + + TestListResultSink evenSink = new TestListResultSink(); + TestListResultSink oddAndTenSink = new TestListResultSink(); + TestListResultSink evenAndOddSink = new TestListResultSink(); + TestListResultSink allSink = new TestListResultSink(); + + SplitStream source = env.generateSequence(1, 11).split(new MyOutputSelector()); + source.select(EVEN).addSink(evenSink); + source.select(ODD, TEN).addSink(oddAndTenSink); + source.select(EVEN, ODD).addSink(evenAndOddSink); + source.addSink(allSink); + + env.execute(); + assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult()); + assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult()); + assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), + evenAndOddSink.getSortedResult()); + assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), + allSink.getSortedResult()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java new file mode 100644 index 0000000..1fbebd0 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java @@ -0,0 +1,703 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.IterativeStream; +import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; +import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.streaming.runtime.util.EvenOddOutputSelector; +import org.apache.flink.test.streaming.runtime.util.NoOpIntMap; +import org.apache.flink.test.streaming.runtime.util.ReceiveCheckNoOpSink; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MathUtils; + +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@SuppressWarnings({ "unchecked", "unused", "serial" }) +public class IterateITCase extends StreamingMultipleProgramsTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(IterateITCase.class); + + private static boolean iterated[]; + + @Test(expected = UnsupportedOperationException.class) + public void testIncorrectParallelism() throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream source = env.fromElements(1, 10); + + IterativeStream iter1 = source.iterate(); + SingleOutputStreamOperator map1 = iter1.map(NoOpIntMap); + iter1.closeWith(map1).print(); + } + + @Test + public void testDoubleClosing() throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // introduce dummy mapper to get to correct parallelism + DataStream source = env.fromElements(1, 10).map(NoOpIntMap); + + IterativeStream iter1 = source.iterate(); + + iter1.closeWith(iter1.map(NoOpIntMap)); + iter1.closeWith(iter1.map(NoOpIntMap)); + } + + + @Test(expected = UnsupportedOperationException.class) + public void testDifferingParallelism() throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // introduce dummy mapper to get to correct parallelism + DataStream source = env.fromElements(1, 10) + .map(NoOpIntMap); + + IterativeStream iter1 = source.iterate(); + + + iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2)); + + } + + + @Test(expected = UnsupportedOperationException.class) + public void testCoDifferingParallelism() throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // introduce dummy mapper to get to correct parallelism + DataStream source = env.fromElements(1, 10).map(NoOpIntMap); + + ConnectedIterativeStreams coIter = source.iterate().withFeedbackType( + Integer.class); + + + coIter.closeWith(coIter.map(NoOpIntCoMap).setParallelism(DEFAULT_PARALLELISM / 2)); + + } + + @Test(expected = UnsupportedOperationException.class) + public void testClosingFromOutOfLoop() throws Exception { + + // this test verifies that we cannot close an iteration with a DataStream that does not + // have the iteration in its predecessors + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // introduce dummy mapper to get to correct parallelism + DataStream source = env.fromElements(1, 10).map(NoOpIntMap); + + IterativeStream iter1 = source.iterate(); + IterativeStream iter2 = source.iterate(); + + + iter2.closeWith(iter1.map(NoOpIntMap)); + + } + + @Test(expected = UnsupportedOperationException.class) + public void testCoIterClosingFromOutOfLoop() throws Exception { + + // this test verifies that we cannot close an iteration with a DataStream that does not + // have the iteration in its predecessors + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // introduce dummy mapper to get to correct parallelism + DataStream source = env.fromElements(1, 10).map(NoOpIntMap); + + IterativeStream iter1 = source.iterate(); + ConnectedIterativeStreams coIter = source.iterate().withFeedbackType( + Integer.class); + + + coIter.closeWith(iter1.map(NoOpIntMap)); + + } + + @Test(expected = IllegalStateException.class) + public void testExecutionWithEmptyIteration() throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream source = env.fromElements(1, 10).map(NoOpIntMap); + + IterativeStream iter1 = source.iterate(); + + iter1.map(NoOpIntMap).print(); + + env.execute(); + } + + @Test + public void testImmutabilityWithCoiteration() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance + + IterativeStream iter1 = source.iterate(); + // Calling withFeedbackType should create a new iteration + ConnectedIterativeStreams iter2 = iter1.withFeedbackType(String.class); + + iter1.closeWith(iter1.map(NoOpIntMap)).print(); + iter2.closeWith(iter2.map(NoOpCoMap)).print(); + + StreamGraph graph = env.getStreamGraph(); + + assertEquals(2, graph.getIterationSourceSinkPairs().size()); + + for (Tuple2 sourceSinkPair: graph.getIterationSourceSinkPairs()) { + assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex()); + } + } + + @Test + public void testmultipleHeadsTailsSimple() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream source1 = env.fromElements(1, 2, 3, 4, 5) + .shuffle() + .map(NoOpIntMap).name("ParallelizeMapShuffle"); + DataStream source2 = env.fromElements(1, 2, 3, 4, 5) + .map(NoOpIntMap).name("ParallelizeMapRebalance"); + + IterativeStream iter1 = source1.union(source2).iterate(); + + DataStream head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2); + DataStream head2 = iter1.map(NoOpIntMap).name("IterForwardMap"); + DataStreamSink head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).addSink(new ReceiveCheckNoOpSink()); + DataStreamSink head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink()); + + SplitStream source3 = env.fromElements(1, 2, 3, 4, 5) + .map(NoOpIntMap).name("EvenOddSourceMap") + .split(new EvenOddOutputSelector()); + + iter1.closeWith(source3.select("even").union( + head1.rebalance().map(NoOpIntMap).broadcast(), head2.shuffle())); + + StreamGraph graph = env.getStreamGraph(); + + JobGraph jg = graph.getJobGraph(); + + assertEquals(1, graph.getIterationSourceSinkPairs().size()); + + Tuple2 sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next(); + StreamNode itSource = sourceSinkPair.f0; + StreamNode itSink = sourceSinkPair.f1; + + assertEquals(4, itSource.getOutEdges().size()); + assertEquals(3, itSink.getInEdges().size()); + + assertEquals(itSource.getParallelism(), itSink.getParallelism()); + + for (StreamEdge edge : itSource.getOutEdges()) { + if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) { + assertTrue(edge.getPartitioner() instanceof RebalancePartitioner); + } else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) { + assertTrue(edge.getPartitioner() instanceof ForwardPartitioner); + } + } + for (StreamEdge edge : itSink.getInEdges()) { + if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapShuffle")) { + assertTrue(edge.getPartitioner() instanceof ShufflePartitioner); + } + + if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapForward")) { + assertTrue(edge.getPartitioner() instanceof ForwardPartitioner); + } + + if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("EvenOddSourceMap")) { + assertTrue(edge.getPartitioner() instanceof ForwardPartitioner); + assertTrue(edge.getSelectedNames().contains("even")); + } + } + + // Test co-location + + JobVertex itSource1 = null; + JobVertex itSink1 = null; + + for (JobVertex vertex : jg.getVertices()) { + if (vertex.getName().contains("IterationSource")) { + itSource1 = vertex; + } else if (vertex.getName().contains("IterationSink")) { + + itSink1 = vertex; + + } + } + + assertTrue(itSource1.getCoLocationGroup() != null); + assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup()); + } + + @Test + public void testmultipleHeadsTailsWithTailPartitioning() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream source1 = env.fromElements(1, 2, 3, 4, 5) + .shuffle() + .map(NoOpIntMap); + + DataStream source2 = env.fromElements(1, 2, 3, 4, 5) + .map(NoOpIntMap); + + IterativeStream iter1 = source1.union(source2).iterate(); + + DataStream head1 = iter1.map(NoOpIntMap).name("map1"); + DataStream head2 = iter1.map(NoOpIntMap) + .setParallelism(DEFAULT_PARALLELISM / 2) + .name("shuffle").rebalance(); + DataStreamSink head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2) + .addSink(new ReceiveCheckNoOpSink()); + DataStreamSink head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink()); + + SplitStream source3 = env.fromElements(1, 2, 3, 4, 5) + .map(NoOpIntMap) + .name("split") + .split(new EvenOddOutputSelector()); + + iter1.closeWith( + source3.select("even").union( + head1.map(NoOpIntMap).name("bc").broadcast(), + head2.map(NoOpIntMap).shuffle())); + + StreamGraph graph = env.getStreamGraph(); + + JobGraph jg = graph.getJobGraph(); + + assertEquals(1, graph.getIterationSourceSinkPairs().size()); + + Tuple2 sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next(); + StreamNode itSource = sourceSinkPair.f0; + StreamNode itSink = sourceSinkPair.f1; + + assertEquals(4, itSource.getOutEdges().size()); + assertEquals(3, itSink.getInEdges().size()); + + + assertEquals(itSource.getParallelism(), itSink.getParallelism()); + + for (StreamEdge edge : itSource.getOutEdges()) { + if (edge.getTargetVertex().getOperatorName().equals("map1")) { + assertTrue(edge.getPartitioner() instanceof ForwardPartitioner); + assertEquals(4, edge.getTargetVertex().getParallelism()); + } else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) { + assertTrue(edge.getPartitioner() instanceof RebalancePartitioner); + assertEquals(2, edge.getTargetVertex().getParallelism()); + } + } + for (StreamEdge edge : itSink.getInEdges()) { + String tailName = edge.getSourceVertex().getOperatorName(); + if (tailName.equals("split")) { + assertTrue(edge.getPartitioner() instanceof ForwardPartitioner); + assertTrue(edge.getSelectedNames().contains("even")); + } else if (tailName.equals("bc")) { + assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner); + } else if (tailName.equals("shuffle")) { + assertTrue(edge.getPartitioner() instanceof ShufflePartitioner); + } + } + + // Test co-location + + JobVertex itSource1 = null; + JobVertex itSink1 = null; + + for (JobVertex vertex : jg.getVertices()) { + if (vertex.getName().contains("IterationSource")) { + itSource1 = vertex; + } else if (vertex.getName().contains("IterationSink")) { + itSink1 = vertex; + } + } + + assertTrue(itSource1.getCoLocationGroup() != null); + assertTrue(itSink1.getCoLocationGroup() != null); + + assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testSimpleIteration() throws Exception { + int numRetries = 5; + int timeoutScale = 1; + + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + iterated = new boolean[DEFAULT_PARALLELISM]; + + DataStream source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) + .map(NoOpBoolMap).name("ParallelizeMap"); + + IterativeStream iteration = source.iterate(3000 * timeoutScale); + + DataStream increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap); + + iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink()); + + iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink()); + + env.execute(); + + for (boolean iter : iterated) { + assertTrue(iter); + } + + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } + } + + @Test + public void testCoIteration() throws Exception { + int numRetries = 5; + int timeoutScale = 1; + + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + TestSink.collected = new ArrayList<>(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream otherSource = env.fromElements("1000", "2000") + .map(NoOpStrMap).name("ParallelizeMap"); + + + ConnectedIterativeStreams coIt = env.fromElements(0, 0) + .map(NoOpIntMap).name("ParallelizeMap") + .iterate(2000 * timeoutScale) + .withFeedbackType("String"); + + try { + coIt.keyBy(1, 2); + fail(); + } catch (InvalidProgramException e) { + // this is expected + } + + DataStream head = coIt + .flatMap(new RichCoFlatMapFunction() { + + private static final long serialVersionUID = 1L; + boolean seenFromSource = false; + + @Override + public void flatMap1(Integer value, Collector out) throws Exception { + out.collect(((Integer) (value + 1)).toString()); + } + + @Override + public void flatMap2(String value, Collector out) throws Exception { + Integer intVal = Integer.valueOf(value); + if (intVal < 2) { + out.collect(((Integer) (intVal + 1)).toString()); + } + if (intVal == 1000 || intVal == 2000) { + seenFromSource = true; + } + } + + @Override + public void close() { + assertTrue(seenFromSource); + } + }); + + coIt.map(new CoMapFunction() { + + @Override + public String map1(Integer value) throws Exception { + return value.toString(); + } + + @Override + public String map2(String value) throws Exception { + return value; + } + }).addSink(new ReceiveCheckNoOpSink()); + + coIt.closeWith(head.broadcast().union(otherSource)); + + head.addSink(new TestSink()).setParallelism(1); + + assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size()); + + env.execute(); + + Collections.sort(TestSink.collected); + assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected); + + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } + } + + /** + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. + * + * For the test to pass all FlatMappers must see at least two records in the iteration, + * which can only be achieved if the hashed values of the input keys map to a complete + * congruence system. Given that the test is designed for 3 parallel FlatMapper instances + * keys chosen from the [1,3] range are a suitable choice. + */ + @Test + public void testGroupByFeedback() throws Exception { + int numRetries = 5; + int timeoutScale = 1; + + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM - 1); + + KeySelector key = new KeySelector() { + + @Override + public Integer getKey(Integer value) throws Exception { + return value % 3; + } + }; + + DataStream source = env.fromElements(1, 2, 3) + .map(NoOpIntMap).name("ParallelizeMap"); + + IterativeStream it = source.keyBy(key).iterate(3000 * timeoutScale); + + DataStream head = it.flatMap(new RichFlatMapFunction() { + + int received = 0; + int key = -1; + + @Override + public void flatMap(Integer value, Collector out) throws Exception { + received++; + if (key == -1) { + key = MathUtils.murmurHash(value % 3) % 3; + } else { + assertEquals(key, MathUtils.murmurHash(value % 3) % 3); + } + if (value > 0) { + out.collect(value - 1); + } + } + + @Override + public void close() { + assertTrue(received > 1); + } + }); + + it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink()); + + env.execute(); + + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } + } + + @SuppressWarnings("deprecation") + @Test + public void testWithCheckPointing() throws Exception { + int numRetries = 5; + int timeoutScale = 1; + + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.enableCheckpointing(); + + DataStream source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) + .map(NoOpBoolMap).name("ParallelizeMap"); + + + IterativeStream iteration = source.iterate(3000 * timeoutScale); + + iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink()); + + try { + env.execute(); + + // this statement should never be reached + fail(); + } catch (UnsupportedOperationException e) { + // expected behaviour + } + + // Test force checkpointing + + try { + env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false); + env.execute(); + + // this statement should never be reached + fail(); + } catch (UnsupportedOperationException e) { + // expected behaviour + } + + env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true); + env.getStreamGraph().getJobGraph(); + + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } + } + + public static final class IterationHead extends RichFlatMapFunction { + public void flatMap(Boolean value, Collector out) throws Exception { + int indx = getRuntimeContext().getIndexOfThisSubtask(); + if (value) { + iterated[indx] = true; + } else { + out.collect(true); + } + } + } + + public static CoMapFunction NoOpCoMap = new CoMapFunction() { + + public String map1(Integer value) throws Exception { + return value.toString(); + } + + public String map2(String value) throws Exception { + return value; + } + }; + + public static MapFunction NoOpIntMap = new NoOpIntMap(); + + public static MapFunction NoOpStrMap = new MapFunction() { + + public String map(String value) throws Exception { + return value; + } + + }; + + public static CoMapFunction NoOpIntCoMap = new CoMapFunction() { + + public Integer map1(Integer value) throws Exception { + return value; + } + + public Integer map2(Integer value) throws Exception { + return value; + } + + }; + + public static MapFunction NoOpBoolMap = new MapFunction() { + + public Boolean map(Boolean value) throws Exception { + return value; + } + + }; + + public static class TestSink implements SinkFunction { + + private static final long serialVersionUID = 1L; + public static List collected = new ArrayList(); + + @Override + public void invoke(String value) throws Exception { + collected.add(value); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java new file mode 100644 index 0000000..0902a3c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class OutputSplitterITCase extends StreamingMultipleProgramsTestBase { + + private static ArrayList expectedSplitterResult = new ArrayList(); + + @SuppressWarnings("unchecked") + @Test + public void testOnMergedDataStream() throws Exception { + TestListResultSink splitterResultSink1 = new TestListResultSink(); + TestListResultSink splitterResultSink2 = new TestListResultSink(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setBufferTimeout(1); + + DataStream d1 = env.fromElements(0, 2, 4, 6, 8); + DataStream d2 = env.fromElements(1, 3, 5, 7, 9); + + d1 = d1.union(d2); + + d1.split(new OutputSelector() { + private static final long serialVersionUID = 8354166915727490130L; + + @Override + public Iterable select(Integer value) { + List s = new ArrayList(); + if (value > 4) { + s.add(">"); + } else { + s.add("<"); + } + return s; + } + }).select(">").addSink(splitterResultSink1); + + d1.split(new OutputSelector() { + private static final long serialVersionUID = -6822487543355994807L; + + @Override + public Iterable select(Integer value) { + List s = new ArrayList(); + if (value % 3 == 0) { + s.add("yes"); + } else { + s.add("no"); + } + return s; + } + }).select("yes").addSink(splitterResultSink2); + env.execute(); + + expectedSplitterResult.clear(); + expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9)); + assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult()); + + expectedSplitterResult.clear(); + expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9)); + assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult()); + } + + @Test + public void testOnSingleDataStream() throws Exception { + TestListResultSink splitterResultSink1 = new TestListResultSink(); + TestListResultSink splitterResultSink2 = new TestListResultSink(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setBufferTimeout(1); + + DataStream ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + ds.split(new OutputSelector() { + private static final long serialVersionUID = 2524335410904414121L; + + @Override + public Iterable select(Integer value) { + List s = new ArrayList(); + if (value % 2 == 0) { + s.add("even"); + } else { + s.add("odd"); + } + return s; + } + }).select("even").addSink(splitterResultSink1); + + ds.split(new OutputSelector() { + + private static final long serialVersionUID = -511693919586034092L; + + @Override + public Iterable select(Integer value) { + List s = new ArrayList(); + if (value % 4 == 0) { + s.add("yes"); + } else { + s.add("no"); + } + return s; + } + }).select("yes").addSink(splitterResultSink2); + env.execute(); + + expectedSplitterResult.clear(); + expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8)); + assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult()); + + expectedSplitterResult.clear(); + expectedSplitterResult.addAll(Arrays.asList(0, 4, 8)); + assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java new file mode 100644 index 0000000..bff8df1 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.streaming.runtime.util.NoOpIntMap; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * IT case that tests the different stream partitioning schemes. + */ +@SuppressWarnings("serial") +public class PartitionerITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = UnsupportedOperationException.class) + public void testForwardFailsLowToHighParallelism() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream src = env.fromElements(1, 2, 3); + + // this doesn't work because it goes from 1 to 3 + src.forward().map(new NoOpIntMap()); + + env.execute(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testForwardFailsHightToLowParallelism() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // this does a rebalance that works + DataStream src = env.fromElements(1, 2, 3).map(new NoOpIntMap()); + + // this doesn't work because it goes from 3 to 1 + src.forward().map(new NoOpIntMap()).setParallelism(1); + + env.execute(); + } + + + @Test + public void partitionerTest() { + + TestListResultSink> hashPartitionResultSink = + new TestListResultSink>(); + TestListResultSink> customPartitionResultSink = + new TestListResultSink>(); + TestListResultSink> broadcastPartitionResultSink = + new TestListResultSink>(); + TestListResultSink> forwardPartitionResultSink = + new TestListResultSink>(); + TestListResultSink> rebalancePartitionResultSink = + new TestListResultSink>(); + TestListResultSink> globalPartitionResultSink = + new TestListResultSink>(); + + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + + DataStream> src = env.fromElements( + new Tuple1("a"), + new Tuple1("b"), + new Tuple1("b"), + new Tuple1("a"), + new Tuple1("a"), + new Tuple1("c"), + new Tuple1("a") + ); + + // partition by hash + src + .keyBy(0) + .map(new SubtaskIndexAssigner()) + .addSink(hashPartitionResultSink); + + // partition custom + DataStream> partitionCustom = src + .partitionCustom(new Partitioner() { + @Override + public int partition(String key, int numPartitions) { + if (key.equals("c")) { + return 2; + } else { + return 0; + } + } + }, 0) + .map(new SubtaskIndexAssigner()); + + partitionCustom.addSink(customPartitionResultSink); + + // partition broadcast + src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink); + + // partition rebalance + src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink); + + // partition forward + src.map(new MapFunction, Tuple1>() { + private static final long serialVersionUID = 1L; + @Override + public Tuple1 map(Tuple1 value) throws Exception { + return value; + } + }) + .forward() + .map(new SubtaskIndexAssigner()) + .addSink(forwardPartitionResultSink); + + // partition global + src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink); + + try { + env.execute(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + List> hashPartitionResult = hashPartitionResultSink.getResult(); + List> customPartitionResult = customPartitionResultSink.getResult(); + List> broadcastPartitionResult = broadcastPartitionResultSink.getResult(); + List> forwardPartitionResult = forwardPartitionResultSink.getResult(); + List> rebalancePartitionResult = rebalancePartitionResultSink.getResult(); + List> globalPartitionResult = globalPartitionResultSink.getResult(); + + verifyHashPartitioning(hashPartitionResult); + verifyCustomPartitioning(customPartitionResult); + verifyBroadcastPartitioning(broadcastPartitionResult); + verifyRebalancePartitioning(forwardPartitionResult); + verifyRebalancePartitioning(rebalancePartitionResult); + verifyGlobalPartitioning(globalPartitionResult); + } + + private static void verifyHashPartitioning(List> hashPartitionResult) { + HashMap verifier = new HashMap(); + for (Tuple2 elem : hashPartitionResult) { + Integer subtaskIndex = verifier.get(elem.f1); + if (subtaskIndex == null) { + verifier.put(elem.f1, elem.f0); + } else if (subtaskIndex != elem.f0) { + fail(); + } + } + } + + private static void verifyCustomPartitioning(List> customPartitionResult) { + for (Tuple2 stringWithSubtask : customPartitionResult) { + if (stringWithSubtask.f1.equals("c")) { + assertEquals(new Integer(2), stringWithSubtask.f0); + } else { + assertEquals(new Integer(0), stringWithSubtask.f0); + } + } + } + + private static void verifyBroadcastPartitioning(List> broadcastPartitionResult) { + List> expected = Arrays.asList( + new Tuple2(0, "a"), + new Tuple2(0, "b"), + new Tuple2(0, "b"), + new Tuple2(0, "a"), + new Tuple2(0, "a"), + new Tuple2(0, "c"), + new Tuple2(0, "a"), + new Tuple2(1, "a"), + new Tuple2(1, "b"), + new Tuple2(1, "b"), + new Tuple2(1, "a"), + new Tuple2(1, "a"), + new Tuple2(1, "c"), + new Tuple2(1, "a"), + new Tuple2(2, "a"), + new Tuple2(2, "b"), + new Tuple2(2, "b"), + new Tuple2(2, "a"), + new Tuple2(2, "a"), + new Tuple2(2, "c"), + new Tuple2(2, "a")); + + assertEquals( + new HashSet>(expected), + new HashSet>(broadcastPartitionResult)); + } + + private static void verifyRebalancePartitioning(List> rebalancePartitionResult) { + List> expected = Arrays.asList( + new Tuple2(0, "a"), + new Tuple2(1, "b"), + new Tuple2(2, "b"), + new Tuple2(0, "a"), + new Tuple2(1, "a"), + new Tuple2(2, "c"), + new Tuple2(0, "a")); + + assertEquals( + new HashSet>(expected), + new HashSet>(rebalancePartitionResult)); + } + + private static void verifyGlobalPartitioning(List> globalPartitionResult) { + List> expected = Arrays.asList( + new Tuple2(0, "a"), + new Tuple2(0, "b"), + new Tuple2(0, "b"), + new Tuple2(0, "a"), + new Tuple2(0, "a"), + new Tuple2(0, "c"), + new Tuple2(0, "a")); + + assertEquals( + new HashSet>(expected), + new HashSet>(globalPartitionResult)); + } + + private static class SubtaskIndexAssigner extends RichMapFunction, Tuple2> { + private static final long serialVersionUID = 1L; + + private int indexOfSubtask; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext runtimeContext = getRuntimeContext(); + indexOfSubtask = runtimeContext.getIndexOfThisSubtask(); + } + + @Override + public Tuple2 map(Tuple1 value) throws Exception { + return new Tuple2(indexOfSubtask, value.f0); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java new file mode 100644 index 0000000..d33a2b1 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class SelfConnectionITCase extends StreamingMultipleProgramsTestBase { + + /** + * We connect two different data streams in a chain to a CoMap. + */ + @Test + public void differentDataStreamSameChain() throws Exception { + + TestListResultSink resultSink = new TestListResultSink(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream src = env.fromElements(1, 3, 5); + + DataStream stringMap = src.map(new MapFunction() { + private static final long serialVersionUID = 1L; + + @Override + public String map(Integer value) throws Exception { + return "x " + value; + } + }); + + stringMap.connect(src).map(new CoMapFunction() { + + private static final long serialVersionUID = 1L; + + @Override + public String map1(String value) { + return value; + } + + @Override + public String map2(Integer value) { + return String.valueOf(value + 1); + } + }).addSink(resultSink); + + env.execute(); + + List expected = Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"); + + List result = resultSink.getResult(); + + Collections.sort(expected); + Collections.sort(result); + + assertEquals(expected, result); + } + + /** + * We connect two different data streams in different chains to a CoMap. + * (This is not actually self-connect.) + */ + @Test + public void differentDataStreamDifferentChain() { + + TestListResultSink resultSink = new TestListResultSink(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + + DataStream src = env.fromElements(1, 3, 5).disableChaining(); + + DataStream stringMap = src.flatMap(new FlatMapFunction() { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Integer value, Collector out) throws Exception { + out.collect("x " + value); + } + }).keyBy(new KeySelector() { + + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(String value) throws Exception { + return value.length(); + } + }); + + DataStream longMap = src.map(new MapFunction() { + + private static final long serialVersionUID = 1L; + + @Override + public Long map(Integer value) throws Exception { + return (long) (value + 1); + } + }).keyBy(new KeySelector() { + + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Long value) throws Exception { + return value.intValue(); + } + }); + + + stringMap.connect(longMap).map(new CoMapFunction() { + + private static final long serialVersionUID = 1L; + + @Override + public String map1(String value) { + return value; + } + + @Override + public String map2(Long value) { + return value.toString(); + } + }).addSink(resultSink); + + try { + env.execute(); + } catch (Exception e) { + e.printStackTrace(); + } + + List expected = Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"); + List result = resultSink.getResult(); + + Collections.sort(expected); + Collections.sort(result); + + assertEquals(expected, result); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java new file mode 100644 index 0000000..6288946 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.fail; + +public class StateBackendITCase extends StreamingMultipleProgramsTestBase { + + /** + * Verify that the user-specified state backend is used even if checkpointing is disabled. + * + * @throws Exception + */ + @Test + public void testStateBackendWithoutCheckpointing() throws Exception { + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + + see.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + see.setStateBackend(new FailingStateBackend()); + + + see.fromElements(new Tuple2<>("Hello", 1)) + .keyBy(0) + .map(new RichMapFunction, String>() { + private static final long serialVersionUID = 1L; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + getRuntimeContext().getKeyValueState("test", String.class, ""); + } + + @Override + public String map(Tuple2 value) throws Exception { + return value.f0; + } + }) + .print(); + + try { + see.execute(); + fail(); + } + catch (JobExecutionException e) { + Throwable t = e.getCause(); + if (!(t != null && t.getCause() instanceof SuccessException)) { + throw e; + } + } + } + + + public static class FailingStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = 1L; + + @Override + public void initializeForJob(Environment env, String operatorIdentifier, TypeSerializer keySerializer) throws Exception { + throw new SuccessException(); + } + + @Override + public void disposeAllStateForCurrentJob() throws Exception {} + + @Override + public void close() throws Exception {} + + @Override + protected ValueState createValueState(TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc) throws Exception { + return null; + } + + @Override + protected ListState createListState(TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc) throws Exception { + return null; + } + + @Override + protected ReducingState createReducingState(TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc) throws Exception { + return null; + } + + @Override + protected FoldingState createFoldingState(TypeSerializer namespaceSerializer, + FoldingStateDescriptor stateDesc) throws Exception { + return null; + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, + long timestamp) throws Exception { + return null; + } + + @Override + public StateHandle checkpointStateSerializable(S state, + long checkpointID, + long timestamp) throws Exception { + return null; + } + } + + static final class SuccessException extends Exception { + private static final long serialVersionUID = -9218191172606739598L; + } + +}