flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-8732] [flip6] Cancel ongoing scheduling operation
Date Thu, 22 Feb 2018 16:22:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1e315f0dd -> 519639c64


[FLINK-8732] [flip6] Cancel ongoing scheduling operation

Keeps track of ongoing scheduling operations in the ExecutionGraph and cancels
them in case of a concurrent cancel, suspend or fail call. This makes sure that
the original cause for termination is maintained.

This closes #5548.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/519639c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/519639c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/519639c6

Branch: refs/heads/master
Commit: 519639c64039563ac4f2a875a8cfa630b25e4e8b
Parents: 1e315f0
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Feb 21 15:57:50 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 22 17:22:09 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  20 ++--
 .../runtime/executiongraph/ExecutionGraph.java  | 101 +++++++++++++++----
 .../executiongraph/ExecutionJobVertex.java      |  10 +-
 .../runtime/executiongraph/ExecutionVertex.java |   5 +-
 .../ExecutionGraphSchedulingTest.java           |  53 ++++++++++
 .../runtime/executiongraph/ExecutionTest.java   |   2 +-
 .../runtime/jobmaster/TestingLogicalSlot.java   |  28 +++--
 7 files changed, 176 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 14d88c3..3e77d3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -358,7 +358,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	//  Actions
 	// --------------------------------------------------------------------------------------------
 
-	public boolean scheduleForExecution() {
+	public CompletableFuture<Void> scheduleForExecution() {
 		final ExecutionGraph executionGraph = getVertex().getExecutionGraph();
 		final SlotProvider resourceProvider = executionGraph.getSlotProvider();
 		final boolean allowQueued = executionGraph.isQueuedSchedulingAllowed();
@@ -377,14 +377,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param queued Flag to indicate whether the scheduler may queue this task if it cannot
 	 *               immediately deploy it.
 	 * @param locationPreferenceConstraint constraint for the location preferences
-	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is
the only state that permits scheduling.
+	 * @return Future which is completed once the Execution has been deployed
 	 */
-	public boolean scheduleForExecution(
+	public CompletableFuture<Void> scheduleForExecution(
 			SlotProvider slotProvider,
 			boolean queued,
 			LocationPreferenceConstraint locationPreferenceConstraint) {
+		final Time allocationTimeout = vertex.getExecutionGraph().getAllocationTimeout();
 		try {
-			final Time allocationTimeout = vertex.getExecutionGraph().getAllocationTimeout();
 			final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(
 				slotProvider,
 				queued,
@@ -395,11 +395,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			// that we directly deploy the tasks if the slot allocation future is completed. This
is
 			// necessary for immediate deployment.
 			final CompletableFuture<Void> deploymentFuture = allocationFuture.handle(
-				(Execution ignored, Throwable throwable) ->  {
+				(Execution ignored, Throwable throwable) -> {
 					if (throwable != null) {
 						markFailed(ExceptionUtils.stripCompletionException(throwable));
-					}
-					else {
+					} else {
 						try {
 							deploy();
 						} catch (Throwable t) {
@@ -415,10 +414,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				allocationFuture.completeExceptionally(new IllegalArgumentException("The slot allocation
future has not been completed yet."));
 			}
 
-			return true;
-		}
-		catch (IllegalExecutionStateException e) {
-			return false;
+			return deploymentFuture;
+		} catch (IllegalExecutionStateException e) {
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9331b2e..22e5c92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -77,6 +77,8 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -90,6 +92,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -270,6 +273,12 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	 * strong reference to any user-defined classes.*/
 	private volatile ErrorInfo failureInfo;
 
+	/**
+	 * Future for an ongoing or completed scheduling action.
+	 */
+	@Nullable
+	private volatile CompletableFuture<Void> schedulingFuture;
+
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving
 -------
 
 	/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
@@ -409,6 +418,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		// the failover strategy must be instantiated last, so that the execution graph
 		// is ready by the time the failover strategy sees it
 		this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover
strategy");
+
+		this.schedulingFuture = null;
 		LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
 	}
 
@@ -857,37 +868,60 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 	public void scheduleForExecution() throws JobException {
 
+		final long currentGlobalModVersion = globalModVersion;
+
 		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
 
+			final CompletableFuture<Void> newSchedulingFuture;
+
 			switch (scheduleMode) {
 
 				case LAZY_FROM_SOURCES:
-					scheduleLazy(slotProvider);
+					newSchedulingFuture = scheduleLazy(slotProvider);
 					break;
 
 				case EAGER:
-					scheduleEager(slotProvider, allocationTimeout);
+					newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
 					break;
 
 				default:
 					throw new JobException("Schedule mode is invalid.");
 			}
+
+			if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion)
{
+				schedulingFuture = newSchedulingFuture.whenCompleteAsync(
+					(Void ignored, Throwable throwable) -> {
+						if (throwable != null) {
+							failGlobal(ExceptionUtils.stripCompletionException(throwable));
+						}
+					},
+					futureExecutor);
+			} else {
+				newSchedulingFuture.cancel(false);
+			}
 		}
 		else {
 			throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
 		}
 	}
 
-	private void scheduleLazy(SlotProvider slotProvider) {
+	private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) {
+
+		final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>(numVerticesTotal);
+
 		// simply take the vertices without inputs.
 		for (ExecutionJobVertex ejv : verticesInCreationOrder) {
 			if (ejv.getJobVertex().isInputVertex()) {
-				ejv.scheduleAll(
+				final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll(
 					slotProvider,
 					allowQueuedScheduling,
 					LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location
preferences should be empty
+
+				schedulingFutures.add(schedulingJobVertexFuture);
 			}
 		}
+
+		return FutureUtils.waitForAll(schedulingFutures);
 	}
 
 	/**
@@ -896,8 +930,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	 * @param slotProvider  The resource provider from which the slots are allocated
 	 * @param timeout       The maximum time that the deployment may take, before a
 	 *                      TimeoutException is thrown.
+	 * @returns Future which is completed once the {@link ExecutionGraph} has been scheduled.
+	 * The future can also be completed exceptionally if an error happened.
 	 */
-	private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
+	private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time
timeout) {
 		checkState(state == JobStatus.RUNNING, "job is not running currently");
 
 		// Important: reserve all the space we need up front.
@@ -925,9 +961,23 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		// the future fails once one slot future fails.
 		final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
 
-		allAllocationsFuture.whenCompleteAsync(
-			(Collection<Execution> allAllocations, Throwable throwable) -> {
-				if (throwable != null) {
+		return allAllocationsFuture
+			.thenAccept(
+				(Collection<Execution> executionsToDeploy) -> {
+					for (Execution execution : executionsToDeploy) {
+						try {
+							execution.deploy();
+						} catch (Throwable t) {
+							throw new CompletionException(
+								new FlinkException(
+									String.format("Could not deploy execution %s.", execution),
+									t));
+						}
+					}
+			})
+			// Generate a more specific failure message for the eager scheduling
+			.exceptionally(
+				(Throwable throwable) -> {
 					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
 					final Throwable resultThrowable;
 
@@ -942,18 +992,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 						resultThrowable = strippedThrowable;
 					}
 
-					failGlobal(resultThrowable);
-				} else {
-					try {
-						// successfully obtained all slots, now deploy
-						for (Execution execution : allAllocations) {
-							execution.deploy();
-						}
-					} catch (Throwable t) {
-						failGlobal(new FlinkException("Could not deploy executions.", t));
-					}
-				}
-			}, futureExecutor);
+					throw new CompletionException(resultThrowable);
+				});
 	}
 
 	public void cancel() {
@@ -966,6 +1006,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
 					// make sure no concurrent local actions interfere with the cancellation
 					final long globalVersionForRestart = incrementGlobalModVersion();
 
+					final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture;
+
+					// cancel ongoing scheduling action
+					if (ongoingSchedulingFuture != null) {
+						ongoingSchedulingFuture.cancel(false);
+					}
+
 					final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
 					// cancel all tasks (that still need cancelling)
@@ -1057,6 +1104,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
 				// make sure no concurrent local actions interfere with the cancellation
 				incrementGlobalModVersion();
 
+				final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture;
+
+				// cancel ongoing scheduling action
+				if (ongoingSchedulingFuture != null) {
+					ongoingSchedulingFuture.cancel(false);
+				}
+
 				for (ExecutionJobVertex ejv: verticesInCreationOrder) {
 					ejv.cancel();
 				}
@@ -1108,6 +1162,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
 				// make sure no concurrent local or global actions interfere with the failover
 				final long globalVersionForRestart = incrementGlobalModVersion();
 
+				final CompletableFuture<Void> ongoingSchedulingFuture = schedulingFuture;
+
+				// cancel ongoing scheduling action
+				if (ongoingSchedulingFuture != null) {
+					ongoingSchedulingFuture.cancel(false);
+				}
+
 				// we build a future that is complete once all vertices have reached a terminal state
 				final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index db336f5..6e578fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -472,18 +473,23 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex,
Archiveable
 	 * @param slotProvider to allocate the slots from
 	 * @param queued if the allocations can be queued
 	 * @param locationPreferenceConstraint constraint for the location preferences
+	 * @return Future which is completed once all {@link Execution} could be deployed
 	 */
-	public void scheduleAll(
+	public CompletableFuture<Void> scheduleAll(
 			SlotProvider slotProvider,
 			boolean queued,
 			LocationPreferenceConstraint locationPreferenceConstraint) {
 		
 		final ExecutionVertex[] vertices = this.taskVertices;
 
+		final ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<>(vertices.length);
+
 		// kick off the tasks
 		for (ExecutionVertex ev : vertices) {
-			ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint);
+			scheduleFutures.add(ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint));
 		}
+
+		return FutureUtils.waitForAll(scheduleFutures);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 88923fb..f13e42c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -607,9 +607,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 * @param slotProvider to allocate the slots from
 	 * @param queued if the allocation can be queued
 	 * @param locationPreferenceConstraint constraint for the location preferences
-	 * @return
+	 * @return Future which is completed once the execution is deployed. The future
+	 * can also completed exceptionally.
 	 */
-	public boolean scheduleForExecution(
+	public CompletableFuture<Void> scheduleForExecution(
 			SlotProvider slotProvider,
 			boolean queued,
 			LocationPreferenceConstraint locationPreferenceConstraint) {

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index b16aa96..88ba446 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -28,8 +28,10 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -40,8 +42,11 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
@@ -412,6 +417,54 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
 	}
 
+	/**
+	 * Tests that an ongoing scheduling operation does not fail the {@link ExecutionGraph}
+	 * if it gets concurrently cancelled
+	 */
+	@Test
+	public void testSchedulingOperationCancellationWhenCancel() throws Exception {
+		final JobVertex jobVertex = new JobVertex("NoOp JobVertex");
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+		jobVertex.setParallelism(2);
+		final JobGraph jobGraph = new JobGraph(jobVertex);
+		jobGraph.setScheduleMode(ScheduleMode.EAGER);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		final CompletableFuture<LogicalSlot> slotFuture1 = new CompletableFuture<>();
+		final CompletableFuture<LogicalSlot> slotFuture2 = new CompletableFuture<>();
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(2);
+		slotProvider.addSlots(jobVertex.getID(), new CompletableFuture[]{slotFuture1, slotFuture2});
+		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
+
+		executionGraph.scheduleForExecution();
+
+		final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+
+		final TestingLogicalSlot slot = new TestingLogicalSlot(
+			new LocalTaskManagerLocation(),
+			new SimpleAckingTaskManagerGateway(),
+			0,
+			new AllocationID(),
+			new SlotRequestId(),
+			new SlotSharingGroupId(),
+			releaseFuture);
+		slotFuture1.complete(slot);
+
+		// cancel should change the state of all executions to CANCELLED
+		executionGraph.cancel();
+
+		// complete the now CANCELLED execution --> this should cause a failure
+		slotFuture2.complete(new TestingLogicalSlot());
+
+		Thread.sleep(1L);
+		// release the first slot to finish the cancellation
+		releaseFuture.complete(null);
+
+		// NOTE: This test will only occasionally fail without the fix since there is
+		// a race between the releaseFuture and the slotFuture2
+		assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.CANCELED));
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index edae2c7..38518d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -304,7 +304,7 @@ public class ExecutionTest extends TestLogger {
 
 		ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
 
-		assertTrue(executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY));
+		executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get();
 
 		Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/519639c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
index e20700e..25f296d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -45,7 +45,10 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	private final int slotNumber;
 
-	private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+	private final CompletableFuture<?> releaseFuture;
+
+	@Nullable
+	private final CompletableFuture<?> customReleaseFuture;
 	
 	private final AllocationID allocationId;
 
@@ -60,7 +63,8 @@ public class TestingLogicalSlot implements LogicalSlot {
 			0,
 			new AllocationID(),
 			new SlotRequestId(),
-			new SlotSharingGroupId());
+			new SlotSharingGroupId(),
+			null);
 	}
 
 	public TestingLogicalSlot(
@@ -69,7 +73,8 @@ public class TestingLogicalSlot implements LogicalSlot {
 			int slotNumber,
 			AllocationID allocationId,
 			SlotRequestId slotRequestId,
-			SlotSharingGroupId slotSharingGroupId) {
+			SlotSharingGroupId slotSharingGroupId,
+			@Nullable CompletableFuture<?> customReleaseFuture) {
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
 		this.payloadReference = new AtomicReference<>();
@@ -77,6 +82,8 @@ public class TestingLogicalSlot implements LogicalSlot {
 		this.allocationId = Preconditions.checkNotNull(allocationId);
 		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 		this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
+		this.releaseFuture = new CompletableFuture<>();
+		this.customReleaseFuture = customReleaseFuture;
 	}
 
 	@Override
@@ -96,7 +103,11 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	@Override
 	public boolean isAlive() {
-		return !releaseFuture.isDone();
+		if (customReleaseFuture != null) {
+			return !customReleaseFuture.isDone();
+		} else {
+			return !releaseFuture.isDone();
+		}
 	}
 
 	@Override
@@ -112,9 +123,12 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	@Override
 	public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-		releaseFuture.complete(null);
-
-		return releaseFuture;
+		if (customReleaseFuture != null) {
+			return customReleaseFuture;
+		} else {
+			releaseFuture.complete(null);
+			return releaseFuture;
+		}
 	}
 
 	@Override


Mime
View raw message