flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-7352] [tests] Stabilize ExecutionGraphRestartTest
Date Thu, 10 Aug 2017 09:21:00 GMT
[FLINK-7352] [tests] Stabilize ExecutionGraphRestartTest

Introduce an explicit waiting for the deployment of tasks. This replaces the loose
ordering induced by Thread.sleep and fixes the race conditions caused by it.

This closes #4501.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f59de67d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f59de67d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f59de67d

Branch: refs/heads/master
Commit: f59de67d9bd440b40352fdea5ede7c709f991a9e
Parents: f9db6fe
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Aug 9 09:57:56 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Aug 10 11:17:53 2017 +0200

----------------------------------------------------------------------
 .../ExecutionGraphRestartTest.java              | 112 ++++++++++++++++---
 .../executiongraph/ExecutionGraphTestUtils.java |  31 ++---
 .../utils/SimpleAckingTaskManagerGateway.java   |  13 +++
 3 files changed, 118 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f59de67d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 7275e0f..acf854f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -70,18 +70,20 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilDeployedAndSwitchToRunning;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -585,11 +587,24 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testConcurrentLocalFailAndRestart() throws Exception {
-		final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
+		final int parallelism = 10;
+		SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+
+		final ExecutionGraph eg = createSimpleTestGraph(
+			new JobID(),
+			taskManagerGateway,
+			new FixedDelayRestartStrategy(10, 0L),
+			createNoOpVertex(parallelism));
+
+		WaitForTasks waitForTasks = new WaitForTasks(parallelism);
+		taskManagerGateway.setCondition(waitForTasks);
+
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
 
-		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		waitForTasks.getFuture().get(1000, TimeUnit.MILLISECONDS);
+
+		switchToRunning(eg);
 
 		final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
 		final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
@@ -629,10 +644,17 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		failTrigger.trigger();
 
 		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
+
+		WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism);
+		taskManagerGateway.setCondition(waitForTasksAfterRestart);
+
 		completeCancellingForAllVertices(eg);
 
 		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
-		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		waitForTasksAfterRestart.getFuture().get(1000, TimeUnit.MILLISECONDS);
+
+		switchToRunning(eg);
 		finishAllVertices(eg);
 
 		eg.waitUntilTerminal();
@@ -646,18 +668,29 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		final int parallelism = 10;
 		final JobID jid = new JobID();
 		final JobVertex vertex = createNoOpVertex(parallelism);
-		final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, new NotCancelAckingTaskGateway());
+		final NotCancelAckingTaskGateway taskManagerGateway = new NotCancelAckingTaskGateway();
+		final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, taskManagerGateway);
 		final TriggeredRestartStrategy restartStrategy = new TriggeredRestartStrategy(restartTrigger);
 
 		final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
+
+		WaitForTasks waitForTasks = new WaitForTasks(parallelism);
+		taskManagerGateway.setCondition(waitForTasks);
+
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
 
-		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		waitForTasks.getFuture().get(1000, TimeUnit.MILLISECONDS);
+
+		switchToRunning(eg);
 
 		// fail into 'RESTARTING'
 		eg.failGlobal(new Exception("intended test failure 1"));
 		assertEquals(JobStatus.FAILING, eg.getState());
+
+		WaitForTasks waitForTasksRestart = new WaitForTasks(parallelism);
+		taskManagerGateway.setCondition(waitForTasksRestart);
+
 		completeCancellingForAllVertices(eg);
 		waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000);
 
@@ -668,7 +701,9 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		restartTrigger.trigger();
 
 		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
-		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		waitForTasksRestart.getFuture().get(1000, TimeUnit.MILLISECONDS);
+		switchToRunning(eg);
 		finishAllVertices(eg);
 
 		eg.waitUntilTerminal();
@@ -684,8 +719,9 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		// this test is inconclusive if not used with a proper multi-threaded executor
 		assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize()
> 1);
 
+		SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
 		final int parallelism = 20;
-		final Scheduler scheduler = createSchedulerWithInstances(parallelism);
+		final Scheduler scheduler = createSchedulerWithInstances(parallelism, taskManagerGateway);
 
 		final SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
@@ -701,23 +737,34 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
 
 		final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
-				new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor,
source, sink);
+			new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor,
source, sink);
+
+		WaitForTasks waitForTasks = new WaitForTasks(parallelism * 2);
+		taskManagerGateway.setCondition(waitForTasks);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
 
-		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		waitForTasks.getFuture().get(1000, TimeUnit.MILLISECONDS);
+
+		switchToRunning(eg);
 
 		// fail into 'RESTARTING'
 		eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
-				new Exception("intended test failure"));
+			new Exception("intended test failure"));
 
 		assertEquals(JobStatus.FAILING, eg.getState());
+
+		WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism * 2);
+		taskManagerGateway.setCondition(waitForTasksAfterRestart);
+
 		completeCancellingForAllVertices(eg);
 
 		// clean termination
 		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
-		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		waitForTasksAfterRestart.getFuture().get(1000, TimeUnit.MILLISECONDS);
+		switchToRunning(eg);
 		finishAllVertices(eg);
 		waitUntilJobStatus(eg, JobStatus.FINISHED, 1000);
 	}
@@ -730,7 +777,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		final int numRestarts = 10;
 		final int parallelism = 20;
 
-		final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1);
+		TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+		final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1, taskManagerGateway);
 
 		final SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
@@ -768,24 +816,23 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private Scheduler createSchedulerWithInstances(int num) {
+	private Scheduler createSchedulerWithInstances(int num, TaskManagerGateway taskManagerGateway)
{
 		final Scheduler scheduler = new Scheduler(executor);
 		final Instance[] instances = new Instance[num];
 
 		for (int i = 0; i < instances.length; i++) {
-			instances[i] = createInstance(55443 + i);
+			instances[i] = createInstance(taskManagerGateway, 55443 + i);
 			scheduler.newInstanceAvailable(instances[i]);
 		}
 
 		return scheduler;
 	}
 
-	private static Instance createInstance(int port) {
+	private static Instance createInstance(TaskManagerGateway taskManagerGateway, int port)
{
 		final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000,
400_000_000);
-		final TaskManagerGateway taskManager = new SimpleAckingTaskManagerGateway();
 		final TaskManagerLocation location = new TaskManagerLocation(
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), port);
-		return new Instance(taskManager, location, new InstanceID(), resources, 1);
+		return new Instance(taskManagerGateway, location, new InstanceID(), resources, 1);
 	}
 
 	// ------------------------------------------------------------------------
@@ -992,4 +1039,33 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			});
 		}
 	}
+
+	/**
+	 * A consumer which counts the number of tasks for which it has been called and completes
a future
+	 * upon reaching the number of tasks to wait for.
+	 */
+	public static class WaitForTasks implements Consumer<ExecutionAttemptID> {
+
+		private final int tasksToWaitFor;
+		private final CompletableFuture<Boolean> allTasksReceived;
+		private int counter;
+
+		public WaitForTasks(int tasksToWaitFor) {
+			this.tasksToWaitFor = tasksToWaitFor;
+			this.allTasksReceived = new CompletableFuture<>();
+		}
+
+		public CompletableFuture<Boolean> getFuture() {
+			return allTasksReceived;
+		}
+
+		@Override
+		public void accept(ExecutionAttemptID executionAttemptID) {
+			counter++;
+
+			if (counter >= tasksToWaitFor) {
+				allTasksReceived.complete(true);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f59de67d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index a6b5a4b..2daf28f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -190,28 +191,17 @@ public class ExecutionGraphTestUtils {
 	}
 
 	/**
-	 * Turns a newly scheduled execution graph into a state where all vertices run.
-	 * This waits until all executions have reached state 'DEPLOYING' and then switches them
to running.
+	 * Checks that all execution are in state DEPLOYING and then switches them
+	 * to state RUNNING
 	 */
-	public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout)
throws TimeoutException {
-		// wait until everything is running
+	public static void switchToRunning(ExecutionGraph eg) {
+		// check that all execution are in state DEPLOYING
 		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
 			final Execution exec = ev.getCurrentExecutionAttempt();
-			waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout);
-		}
-
-		// Note: As ugly as it is, we need this minor sleep, because between switching
-		// to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check
-		// against concurrent modifications (cancel / fail). We can only switch this to running
-		// once that check is passed. For the actual runtime, this switch is triggered by a callback
-		// from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
-		// which cannot easily tell us when that condition has happened, unfortunately.
-		try {
-			Thread.sleep(2);
-		} catch (InterruptedException e) {
-			Thread.currentThread().interrupt();
+			assert(exec.getState() == ExecutionState.DEPLOYING);
 		}
 
+		// switch executions to RUNNING
 		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
 			final Execution exec = ev.getCurrentExecutionAttempt();
 			exec.switchToRunning();
@@ -285,7 +275,7 @@ public class ExecutionGraphTestUtils {
 	public static ExecutionGraph createSimpleTestGraph(RestartStrategy restartStrategy) throws
Exception {
 		JobVertex vertex = createNoOpVertex(10);
 
-		return createSimpleTestGraph(new JobID(), restartStrategy, vertex);
+		return createSimpleTestGraph(new JobID(), new SimpleAckingTaskManagerGateway(), restartStrategy,
vertex);
 	}
 
 	/**
@@ -294,7 +284,7 @@ public class ExecutionGraphTestUtils {
 	 * <p>The execution graph uses {@link NoRestartStrategy} as the restart strategy.
 	 */
 	public static ExecutionGraph createSimpleTestGraph(JobID jid, JobVertex... vertices) throws
Exception {
-		return createSimpleTestGraph(jid, new NoRestartStrategy(), vertices);
+		return createSimpleTestGraph(jid, new SimpleAckingTaskManagerGateway(), new NoRestartStrategy(),
vertices);
 	}
 
 	/**
@@ -302,6 +292,7 @@ public class ExecutionGraphTestUtils {
 	 */
 	public static ExecutionGraph createSimpleTestGraph(
 			JobID jid,
+			TaskManagerGateway taskManagerGateway,
 			RestartStrategy restartStrategy,
 			JobVertex... vertices) throws Exception {
 
@@ -310,7 +301,7 @@ public class ExecutionGraphTestUtils {
 			numSlotsNeeded += vertex.getParallelism();
 		}
 
-		SlotProvider slotProvider = new SimpleSlotProvider(jid, numSlotsNeeded);
+		SlotProvider slotProvider = new SimpleSlotProvider(jid, numSlotsNeeded, taskManagerGateway);
 
 		return createSimpleTestGraph(jid, slotProvider, restartStrategy, vertices);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f59de67d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index b968d39..a38f674 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -33,8 +33,10 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * A TaskManagerGateway that simply acks the basic operations (deploy, cancel, update) and
does not
@@ -44,6 +46,16 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway
{
 
 	private final String address = UUID.randomUUID().toString();
 
+	private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition;
+
+	public SimpleAckingTaskManagerGateway() {
+		optSubmitCondition = Optional.empty();
+	}
+
+	public void setCondition(Consumer<ExecutionAttemptID> predicate) {
+		optSubmitCondition = Optional.of(predicate);
+	}
+
 	@Override
 	public String getAddress() {
 		return address;
@@ -73,6 +85,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway
{
 
 	@Override
 	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time
timeout) {
+		optSubmitCondition.ifPresent(condition -> condition.accept(tdd.getExecutionAttemptId()));
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 


Mime
View raw message