flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/6] flink git commit: [tests] Add test for a program with very fast failure rates.
Date Mon, 25 Jan 2016 14:22:03 GMT
[tests] Add test for a program with very fast failure rates.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4806158e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4806158e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4806158e

Branch: refs/heads/master
Commit: 4806158e4ad9d9c7bd4a471a2f7e1b4efe6b48ce
Parents: 6823895
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Jan 18 19:28:17 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Jan 25 15:07:16 2016 +0100

----------------------------------------------------------------------
 .../flink/test/recovery/FastFailuresITCase.java | 87 ++++++++++++++++++++
 1 file changed, 87 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4806158e/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
new file mode 100644
index 0000000..0684fde
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.fail;
+
+public class FastFailuresITCase {
+
+	static final AtomicInteger FAILURES_SO_FAR = new AtomicInteger();
+	static final int NUM_FAILURES = 200;
+	
+	@Test
+	public void testThis() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.getConfig().setExecutionRetryDelay(0);
+		env.setParallelism(4);
+		env.enableCheckpointing(1000);
+
+		DataStream<Tuple2<Integer, Integer>> input = env.addSource(new RichSourceFunction<Tuple2<Integer,
Integer>>() {
+
+			@Override
+			public void open(Configuration parameters) {
+				if (FAILURES_SO_FAR.incrementAndGet() <= NUM_FAILURES) {
+					throw new RuntimeException("fail");
+				}
+			}
+
+			@Override
+			public void run(SourceContext<Tuple2<Integer, Integer>> ctx)  {}
+
+			@Override
+			public void cancel() {}
+		});
+
+		input
+				.keyBy(0)
+				.map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
+
+					@Override
+					public Integer map(Tuple2<Integer, Integer> value) {
+						return value.f0;
+					}
+				})
+				.addSink(new SinkFunction<Integer>() {
+					@Override
+					public void invoke(Integer value) {}
+				});
+
+		try {
+			env.execute();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


Mime
View raw message