flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [hotfix] [streaming-java] Harden IterateTest
Date Thu, 09 Jun 2016 11:55:20 GMT
Repository: flink
Updated Branches:
  refs/heads/master c532638a0 -> 776253cbb


[hotfix] [streaming-java] Harden IterateTest

Adds retries and timeout scaling to all iteration tests, which rely
on iteration timeouts. The way the tests rely on these timoeuts is
prone to races. If the failures occur again, I vote to ignore the
tests until iteration termination is fixed properly.

Example test failures:
- https://s3.amazonaws.com/archive.travis-ci.org/jobs/134215892/log.txt
- https://s3.amazonaws.com/archive.travis-ci.org/jobs/134215975/log.txt

This closes #2087.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/776253cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/776253cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/776253cb

Branch: refs/heads/master
Commit: 776253cbb0574337b0736dc27711007821ba3d2c
Parents: c532638
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Jun 9 11:34:34 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Jun 9 13:54:49 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/IterateTest.java | 327 ++++++++++++-------
 1 file changed, 201 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/776253cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index c4343f6..c6875dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.api;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -29,7 +24,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.util.MathUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -52,13 +46,25 @@ import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.MathUtils;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings({ "unchecked", "unused", "serial" })
 public class IterateTest extends StreamingMultipleProgramsTestBase {
 
+	private static final Logger LOG = LoggerFactory.getLogger(IterateTest.class);
+
 	private static boolean iterated[];
 
 	@Test(expected = UnsupportedOperationException.class)
@@ -366,100 +372,135 @@ public class IterateTest extends StreamingMultipleProgramsTestBase
{
 	@SuppressWarnings("rawtypes")
 	@Test
 	public void testSimpleIteration() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		iterated = new boolean[DEFAULT_PARALLELISM];
+		int numRetries = 5;
+		int timeoutScale = 1;
 
-		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM
* 2, false))
-				.map(NoOpBoolMap).name("ParallelizeMap");
+		for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+			try {
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+				iterated = new boolean[DEFAULT_PARALLELISM];
 
-		IterativeStream<Boolean> iteration = source.iterate(3000);
+				DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM
* 2, false))
+						.map(NoOpBoolMap).name("ParallelizeMap");
 
-		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
+				IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
 
-		iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink());
+				DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
 
-		iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
+				iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink());
 
-		env.execute();
+				iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
 
-		for (boolean iter : iterated) {
-			assertTrue(iter);
-		}
+				env.execute();
 
+				for (boolean iter : iterated) {
+					assertTrue(iter);
+				}
+
+				break; // success
+			} catch (Throwable t) {
+				LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
+
+				if (numRetry >= numRetries - 1) {
+					throw t;
+				} else {
+					timeoutScale *= 2;
+				}
+			}
+		}
 	}
 
 	@Test
 	public void testCoIteration() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
+		int numRetries = 5;
+		int timeoutScale = 1;
 
-		DataStream<String> otherSource = env.fromElements("1000", "2000")
-				.map(NoOpStrMap).name("ParallelizeMap");
+		for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+			try {
+				TestSink.collected = new ArrayList<>();
 
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+				env.setParallelism(2);
 
-		ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0)
-				.map(NoOpIntMap).name("ParallelizeMap")
-				.iterate(2000)
-				.withFeedbackType("String");
+				DataStream<String> otherSource = env.fromElements("1000", "2000")
+						.map(NoOpStrMap).name("ParallelizeMap");
 
-		try {
-			coIt.keyBy(1, 2);
-			fail();
-		} catch (InvalidProgramException e) {
-			// this is expected
-		}
 
-		DataStream<String> head = coIt
-				.flatMap(new RichCoFlatMapFunction<Integer, String, String>() {
+				ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0)
+						.map(NoOpIntMap).name("ParallelizeMap")
+						.iterate(2000 * timeoutScale)
+						.withFeedbackType("String");
+
+				try {
+					coIt.keyBy(1, 2);
+					fail();
+				} catch (InvalidProgramException e) {
+					// this is expected
+				}
 
-					private static final long serialVersionUID = 1L;
-					boolean seenFromSource = false;
+				DataStream<String> head = coIt
+						.flatMap(new RichCoFlatMapFunction<Integer, String, String>() {
 
-					@Override
-					public void flatMap1(Integer value, Collector<String> out) throws Exception {
-						out.collect(((Integer) (value + 1)).toString());
-					}
+							private static final long serialVersionUID = 1L;
+							boolean seenFromSource = false;
+
+							@Override
+							public void flatMap1(Integer value, Collector<String> out) throws Exception
{
+								out.collect(((Integer) (value + 1)).toString());
+							}
+
+							@Override
+							public void flatMap2(String value, Collector<String> out) throws Exception {
+								Integer intVal = Integer.valueOf(value);
+								if (intVal < 2) {
+									out.collect(((Integer) (intVal + 1)).toString());
+								}
+								if (intVal == 1000 || intVal == 2000) {
+									seenFromSource = true;
+								}
+							}
+
+							@Override
+							public void close() {
+								assertTrue(seenFromSource);
+							}
+						});
+
+				coIt.map(new CoMapFunction<Integer, String, String>() {
 
 					@Override
-					public void flatMap2(String value, Collector<String> out) throws Exception {
-						Integer intVal = Integer.valueOf(value);
-						if (intVal < 2) {
-							out.collect(((Integer) (intVal + 1)).toString());
-						}
-						if (intVal == 1000 || intVal == 2000) {
-							seenFromSource = true;
-						}
+					public String map1(Integer value) throws Exception {
+						return value.toString();
 					}
 
 					@Override
-					public void close() {
-						assertTrue(seenFromSource);
+					public String map2(String value) throws Exception {
+						return value;
 					}
-				});
+				}).addSink(new ReceiveCheckNoOpSink<String>());
 
-		coIt.map(new CoMapFunction<Integer, String, String>() {
+				coIt.closeWith(head.broadcast().union(otherSource));
 
-			@Override
-			public String map1(Integer value) throws Exception {
-				return value.toString();
-			}
+				head.addSink(new TestSink()).setParallelism(1);
 
-			@Override
-			public String map2(String value) throws Exception {
-				return value;
-			}
-		}).addSink(new ReceiveCheckNoOpSink<String>());
+				assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size());
 
-		coIt.closeWith(head.broadcast().union(otherSource));
+				env.execute();
 
-		head.addSink(new TestSink()).setParallelism(1);
+				Collections.sort(TestSink.collected);
+				assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
 
-		assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size());
-
-		env.execute();
+				break; // success
+			} catch (Throwable t) {
+				LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
 
-		Collections.sort(TestSink.collected);
-		assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
+				if (numRetry >= numRetries - 1) {
+					throw t;
+				} else {
+					timeoutScale *= 2;
+				}
+			}
+		}
 	}
 
 	/**
@@ -473,89 +514,123 @@ public class IterateTest extends StreamingMultipleProgramsTestBase
{
      */
 	@Test
 	public void testGroupByFeedback() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(DEFAULT_PARALLELISM - 1);
+		int numRetries = 5;
+		int timeoutScale = 1;
 
-		KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
+		for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+			try {
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+				env.setParallelism(DEFAULT_PARALLELISM - 1);
 
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value % 3;
-			}
-		};
+				KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
 
-		DataStream<Integer> source = env.fromElements(1, 2, 3)
-				.map(NoOpIntMap).name("ParallelizeMap");
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value % 3;
+					}
+				};
 
-		IterativeStream<Integer> it = source.keyBy(key).iterate(3000);
+				DataStream<Integer> source = env.fromElements(1, 2, 3)
+						.map(NoOpIntMap).name("ParallelizeMap");
 
-		DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>()
{
+				IterativeStream<Integer> it = source.keyBy(key).iterate(3000 * timeoutScale);
 
-			int received = 0;
-			int key = -1;
+				DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>()
{
 
-			@Override
-			public void flatMap(Integer value, Collector<Integer> out) throws Exception {
-				received++;
-				if (key == -1) {
-					key = MathUtils.murmurHash(value % 3) % 3;
-				} else {
-					assertEquals(key, MathUtils.murmurHash(value % 3) % 3);
-				}
-				if (value > 0) {
-					out.collect(value - 1);
-				}
-			}
+					int received = 0;
+					int key = -1;
 
-			@Override
-			public void close() {
-				assertTrue(received > 1);
-			}
-		});
+					@Override
+					public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+						received++;
+						if (key == -1) {
+							key = MathUtils.murmurHash(value % 3) % 3;
+						} else {
+							assertEquals(key, MathUtils.murmurHash(value % 3) % 3);
+						}
+						if (value > 0) {
+							out.collect(value - 1);
+						}
+					}
 
-		it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
+					@Override
+					public void close() {
+						assertTrue(received > 1);
+					}
+				});
 
-		env.execute();
+				it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
+
+				env.execute();
+
+				break; // success
+			} catch (Throwable t) {
+				LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
+
+				if (numRetry >= numRetries - 1) {
+					throw t;
+				} else {
+					timeoutScale *= 2;
+				}
+			}
+		}
 	}
 
 	@SuppressWarnings("deprecation")
 	@Test
 	public void testWithCheckPointing() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		int numRetries = 5;
+		int timeoutScale = 1;
 
-		env.enableCheckpointing();
+		for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+			try {
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		DataStream<Boolean> source = env .fromCollection(Collections.nCopies(DEFAULT_PARALLELISM
* 2, false))
-				.map(NoOpBoolMap).name("ParallelizeMap");
+				env.enableCheckpointing();
 
+				DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM
* 2, false))
+						.map(NoOpBoolMap).name("ParallelizeMap");
 
-		IterativeStream<Boolean> iteration = source.iterate(3000);
 
-		iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
+				IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
 
-		try {
-			env.execute();
+				iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
 
-			// this statement should never be reached
-			fail();
-		} catch (UnsupportedOperationException e) {
-			// expected behaviour
-		}
+				try {
+					env.execute();
 
-		// Test force checkpointing
+					// this statement should never be reached
+					fail();
+				} catch (UnsupportedOperationException e) {
+					// expected behaviour
+				}
 
-		try {
-			env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false);
-			env.execute();
+				// Test force checkpointing
 
-			// this statement should never be reached
-			fail();
-		} catch (UnsupportedOperationException e) {
-			// expected behaviour
-		}
+				try {
+					env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false);
+					env.execute();
+
+					// this statement should never be reached
+					fail();
+				} catch (UnsupportedOperationException e) {
+					// expected behaviour
+				}
+
+				env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
+				env.getStreamGraph().getJobGraph();
 
-		env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
-		env.getStreamGraph().getJobGraph();
+				break; // success
+			} catch (Throwable t) {
+				LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
+
+				if (numRetry >= numRetries - 1) {
+					throw t;
+				} else {
+					timeoutScale *= 2;
+				}
+			}
+		}
 	}
 
 	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean>
{


Mime
View raw message