flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [5/9] flink git commit: [FLINK-8653] [flip6] Remove internal slot request timeout from SlotPool
Date Sun, 18 Feb 2018 09:13:58 GMT
[FLINK-8653] [flip6] Remove internal slot request timeout from SlotPool

Instead of using the internal slot request timeout to time out pending slot requests,
we use the timeout passed to SlotPool#allocateSlot to time out pending slot requests.

This closes #5483.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8a88669
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8a88669
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8a88669

Branch: refs/heads/master
Commit: d8a8866973f7e0463047963e6b242cdc2cb82fec
Parents: d9d89ff
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Feb 14 12:09:01 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sun Feb 18 10:12:54 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      |  1 -
 .../runtime/jobmaster/slotpool/SlotPool.java    | 89 ++++++++++----------
 .../jobmaster/slotpool/SlotPoolRpcTest.java     | 72 ++++++----------
 .../jobmaster/slotpool/SlotPoolTest.java        | 34 +-------
 4 files changed, 74 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 139c053..dfa4d1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -276,7 +276,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements
JobMast
 			jobGraph.getJobID(),
 			SystemClock.getInstance(),
 			rpcTimeout,
-			jobMasterConfiguration.getSlotRequestTimeout(),
 			jobMasterConfiguration.getSlotIdleTimeout());
 
 		this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 72bb7e1..6d714e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -67,6 +67,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -111,9 +112,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 	/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor).
*/
 	private final Time rpcTimeout;
 
-	/** Timeout for allocation round trips (RM -> launch TM -> offer slot). */
-	private final Time slotRequestTimeout;
-
 	/** Timeout for releasing idle slots. */
 	private final Time idleSlotTimeout;
 
@@ -139,7 +137,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			jobId,
 			SystemClock.getInstance(),
 			AkkaUtils.getDefaultTimeout(),
-			Time.milliseconds(JobManagerOptions.SLOT_REQUEST_TIMEOUT.defaultValue()),
 			Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()));
 	}
 
@@ -148,7 +145,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			JobID jobId,
 			Clock clock,
 			Time rpcTimeout,
-			Time slotRequestTimeout,
 			Time idleSlotTimeout) {
 
 		super(rpcService);
@@ -157,7 +153,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 		this.clock = checkNotNull(clock);
 		this.rpcTimeout = checkNotNull(rpcTimeout);
 		this.idleSlotTimeout = checkNotNull(idleSlotTimeout);
-		this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
 
 		this.registeredTaskManagers = new HashSet<>(16);
 		this.allocatedSlots = new AllocatedSlots();
@@ -307,7 +302,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			scheduledUnit,
 			resourceProfile,
 			locationPreferences,
-			allowQueuedScheduling);
+			allowQueuedScheduling,
+			timeout);
 	}
 
 	private CompletableFuture<LogicalSlot> internalAllocateSlot(
@@ -315,7 +311,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			ScheduledUnit task,
 			ResourceProfile resourceProfile,
 			Collection<TaskManagerLocation> locationPreferences,
-			boolean allowQueuedScheduling) {
+			boolean allowQueuedScheduling,
+			Time allocationTimeout) {
 
 		final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
 
@@ -337,13 +334,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 						multiTaskSlotManager,
 						resourceProfile,
 						locationPreferences,
-						allowQueuedScheduling);
+						allowQueuedScheduling,
+						allocationTimeout);
 				} else {
 					multiTaskSlotLocality = allocateMultiTaskSlot(
 						task.getJobVertexId(), multiTaskSlotManager,
 						resourceProfile,
 						locationPreferences,
-						allowQueuedScheduling);
+						allowQueuedScheduling,
+						allocationTimeout);
 				}
 			} catch (NoResourceAvailableException noResourceException) {
 				return FutureUtils.completedExceptionally(noResourceException);
@@ -364,7 +363,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 				slotRequestId,
 				resourceProfile,
 				locationPreferences,
-				allowQueuedScheduling);
+				allowQueuedScheduling,
+				allocationTimeout);
 
 			return slotAndLocalityFuture.thenApply(
 				(SlotAndLocality slotAndLocality) -> {
@@ -399,6 +399,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 	 * @param resourceProfile specifying the requirements for the requested slot
 	 * @param locationPreferences containing preferred TaskExecutors on which to allocate the
slot
 	 * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not
be completed yet) is allowed, otherwise false
+	 * @param allocationTimeout timeout before the slot allocation times out
 	 * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated{@link
SlotSharingManager.MultiTaskSlot}
 	 * 		and its locality wrt the given location preferences
 	 * @throws NoResourceAvailableException if no task slot could be allocated
@@ -408,7 +409,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			SlotSharingManager multiTaskSlotManager,
 			ResourceProfile resourceProfile,
 			Collection<TaskManagerLocation> locationPreferences,
-			boolean allowQueuedScheduling) throws NoResourceAvailableException {
+			boolean allowQueuedScheduling,
+			Time allocationTimeout) throws NoResourceAvailableException {
 		final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
 
 		if (coLocationSlotRequestId != null) {
@@ -437,7 +439,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			coLocationConstraint.getGroupId(), multiTaskSlotManager,
 			resourceProfile,
 			actualLocationPreferences,
-			allowQueuedScheduling);
+			allowQueuedScheduling,
+			allocationTimeout);
 
 		// check whether we fulfill the co-location constraint
 		if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() !=
Locality.LOCAL) {
@@ -493,6 +496,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 	 * @param resourceProfile specifying the requirements for the requested slot
 	 * @param locationPreferences containing preferred TaskExecutors on which to allocate the
slot
 	 * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not
be completed yet) is allowed, otherwise false
+	 * @param allocationTimeout timeout before the slot allocation times out
 	 * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated
{@link SlotSharingManager.MultiTaskSlot}
 	 * 		and its locality wrt the given location preferences
 	 * @throws NoResourceAvailableException if no task slot could be allocated
@@ -502,7 +506,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			SlotSharingManager slotSharingManager,
 			ResourceProfile resourceProfile,
 			Collection<TaskManagerLocation> locationPreferences,
-			boolean allowQueuedScheduling) throws NoResourceAvailableException {
+			boolean allowQueuedScheduling,
+			Time allocationTimeout) throws NoResourceAvailableException {
 
 		// check first whether we have a resolved root slot which we can use
 		SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = slotSharingManager.getResolvedRootSlot(
@@ -552,7 +557,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 
 			if (multiTaskSlotFuture == null) {
 				// it seems as if we have to request a new slot from the resource manager, this is always
the last resort!!!
-				final CompletableFuture<AllocatedSlot> futureSlot = requestNewAllocatedSlot(allocatedSlotRequestId,
resourceProfile);
+				final CompletableFuture<AllocatedSlot> futureSlot = requestNewAllocatedSlot(
+					allocatedSlotRequestId,
+					resourceProfile,
+					allocationTimeout);
 
 				multiTaskSlotFuture = slotSharingManager.createRootSlot(
 					multiTaskSlotRequestId,
@@ -597,13 +605,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 	 * @param resourceProfile which the allocated slot should fulfill
 	 * @param locationPreferences for the allocated slot
 	 * @param allowQueuedScheduling true if the slot allocation can be completed in the future
+	 * @param allocationTimeout timeout before the slot allocation times out
 	 * @return Future containing the allocated simple slot
 	 */
 	private CompletableFuture<SlotAndLocality> requestAllocatedSlot(
 			SlotRequestId slotRequestId,
 			ResourceProfile resourceProfile,
 			Collection<TaskManagerLocation> locationPreferences,
-			boolean allowQueuedScheduling) {
+			boolean allowQueuedScheduling,
+			Time allocationTimeout) {
 
 		final CompletableFuture<SlotAndLocality> allocatedSlotLocalityFuture;
 
@@ -616,7 +626,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			// we have to request a new allocated slot
 			CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestNewAllocatedSlot(
 				slotRequestId,
-				resourceProfile);
+				resourceProfile,
+				allocationTimeout);
 
 			allocatedSlotLocalityFuture = allocatedSlotFuture.thenApply((AllocatedSlot allocatedSlot)
-> new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
 		} else {
@@ -634,16 +645,29 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 	 *
 	 * @param slotRequestId identifying the requested slot
 	 * @param resourceProfile which the requested slot should fulfill
+	 * @param allocationTimeout timeout before the slot allocation times out
 	 * @return An {@link AllocatedSlot} future which is completed once the slot is offered to
the {@link SlotPool}
 	 */
 	private CompletableFuture<AllocatedSlot> requestNewAllocatedSlot(
-		SlotRequestId slotRequestId,
-		ResourceProfile resourceProfile) {
+			SlotRequestId slotRequestId,
+			ResourceProfile resourceProfile,
+			Time allocationTimeout) {
 
 		final PendingRequest pendingRequest = new PendingRequest(
 			slotRequestId,
 			resourceProfile);
 
+		// register request timeout
+		FutureUtils
+			.orTimeout(pendingRequest.getAllocatedSlotFuture(), allocationTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS)
+			.whenCompleteAsync(
+				(AllocatedSlot ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						removePendingRequest(slotRequestId);
+					}
+				},
+				getMainThreadExecutor());
+
 		if (resourceManagerGateway == null) {
 			stashRequestWaitingForResourceManager(pendingRequest);
 		} else {
@@ -678,15 +702,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
 			rpcTimeout);
 
-		CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
-			(Acknowledge value) -> {
-				slotRequestToResourceManagerSuccess(pendingRequest.getSlotRequestId());
-			},
-			getMainThreadExecutor());
-
 		// on failure, fail the request future
-		slotRequestProcessingFuture.whenCompleteAsync(
-			(Void v, Throwable failure) -> {
+		rmResponse.whenCompleteAsync(
+			(Acknowledge ignored, Throwable failure) -> {
 				if (failure != null) {
 					slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
 				}
@@ -694,12 +712,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 			getMainThreadExecutor());
 	}
 
-	private void slotRequestToResourceManagerSuccess(final SlotRequestId requestId) {
-		// a request is pending from the ResourceManager to a (future) TaskManager
-		// we only add the watcher here in case that request times out
-		scheduleRunAsync(() -> checkTimeoutSlotAllocation(requestId), slotRequestTimeout);
-	}
-
 	private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure)
{
 		PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
 		if (request != null) {
@@ -727,17 +739,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway,
AllocatedS
 				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
 
 		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
-
-		scheduleRunAsync(() -> checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId()),
slotRequestTimeout);
-	}
-
-	private void checkTimeoutRequestWaitingForResourceManager(SlotRequestId slotRequestId) {
-		PendingRequest request = waitingForResourceManager.remove(slotRequestId);
-		if (request != null) {
-			failPendingRequest(
-				request,
-				new TimeoutException("No slot available and no connection to Resource Manager established."));
-		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index bf8037d..a9be9cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -50,7 +51,6 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
-import akka.pattern.AskTimeoutException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -66,7 +66,6 @@ import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -80,6 +79,8 @@ public class SlotPoolRpcTest extends TestLogger {
 
 	private static final Time timeout = Time.seconds(10L);
 
+	private static final Time fastTimeout = Time.milliseconds(1L);
+
 	// ------------------------------------------------------------------------
 	//  setup
 	// ------------------------------------------------------------------------
@@ -111,7 +112,6 @@ public class SlotPoolRpcTest extends TestLogger {
 			jid,
 			SystemClock.getInstance(),
 			TestingUtils.infiniteTime(),
-			Time.milliseconds(10L), // this is the timeout for the request tested here
 			TestingUtils.infiniteTime()
 		);
 
@@ -124,7 +124,7 @@ public class SlotPoolRpcTest extends TestLogger {
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
-				TestingUtils.infiniteTime());
+				fastTimeout);
 
 			try {
 				future.get();
@@ -146,7 +146,6 @@ public class SlotPoolRpcTest extends TestLogger {
 			jid,
 			SystemClock.getInstance(),
 			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
 		try {
@@ -160,27 +159,26 @@ public class SlotPoolRpcTest extends TestLogger {
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
-				Time.milliseconds(10L));
+				fastTimeout);
 
 			try {
 				future.get();
-				fail("We expected a AskTimeoutException.");
+				fail("We expected a TimeoutException.");
 			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
 			}
 
-			assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
-
-			slotPoolGateway.releaseSlot(requestId, null, null).get();
-
 			assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
 		} finally {
 			RpcUtils.terminateRpcEndpoint(pool, timeout);
 		}
 	}
 
+	/**
+	 * Tests that a slot allocation times out wrt to the specified time out.
+	 */
 	@Test
-	public void testCancelSlotAllocationWithResourceManager() throws Exception {
+	public void testSlotAllocationTimeout() throws Exception {
 		final JobID jid = new JobID();
 
 		final TestingSlotPool pool = new TestingSlotPool(
@@ -188,7 +186,6 @@ public class SlotPoolRpcTest extends TestLogger {
 			jid,
 			SystemClock.getInstance(),
 			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
 		try {
@@ -201,22 +198,19 @@ public class SlotPoolRpcTest extends TestLogger {
 			SlotRequestId requestId = new SlotRequestId();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
-				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+				new DummyScheduledUnit(),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
-				Time.milliseconds(10L));
+				fastTimeout);
 
 			try {
 				future.get();
-				fail("We expected a AskTimeoutException.");
+				fail("We expected a TimeoutException.");
 			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
 			}
 
-			assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
-
-			slotPoolGateway.releaseSlot(requestId, null, null).get();
 			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
 		} finally {
 			RpcUtils.terminateRpcEndpoint(pool, timeout);
@@ -224,10 +218,10 @@ public class SlotPoolRpcTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that allocated slots are not cancelled.
+	 * Tests that extra slots are kept by the {@link SlotPool}.
 	 */
 	@Test
-	public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception {
+	public void testExtraSlotsAreKept() throws Exception {
 		final JobID jid = new JobID();
 
 		final TestingSlotPool pool = new TestingSlotPool(
@@ -235,7 +229,6 @@ public class SlotPoolRpcTest extends TestLogger {
 			jid,
 			SystemClock.getInstance(),
 			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
 		try {
@@ -257,15 +250,17 @@ public class SlotPoolRpcTest extends TestLogger {
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
 				true,
-				Time.milliseconds(10L));
+				fastTimeout);
 
 			try {
 				future.get();
-				fail("We expected a AskTimeoutException.");
+				fail("We expected a TimeoutException.");
 			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
 			}
 
+			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
+
 			AllocationID allocationId = allocationIdFuture.get();
 			final SlotOffer slotOffer = new SlotOffer(
 				allocationId,
@@ -278,13 +273,6 @@ public class SlotPoolRpcTest extends TestLogger {
 
 			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
 
-			assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
-
-			assertTrue(pool.containsAllocatedSlot(allocationId).get());
-
-			pool.releaseSlot(requestId, null, null).get();
-
-			assertFalse(pool.containsAllocatedSlot(allocationId).get());
 			assertTrue(pool.containsAvailableSlot(allocationId).get());
 		} finally {
 			RpcUtils.terminateRpcEndpoint(pool, timeout);
@@ -296,7 +284,7 @@ public class SlotPoolRpcTest extends TestLogger {
 	 * it will automatically call cancelSlotAllocation as will inject future.whenComplete in
ProviderAndOwner.
 	 */
 	@Test
-	public void testProviderAndOwner() throws Exception {
+	public void testProviderAndOwnerSlotAllocationTimeout() throws Exception {
 		final JobID jid = new JobID();
 
 		final TestingSlotPool pool = new TestingSlotPool(
@@ -304,7 +292,6 @@ public class SlotPoolRpcTest extends TestLogger {
 			jid,
 			SystemClock.getInstance(),
 			TestingUtils.infiniteTime(),
-			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
 		final CompletableFuture<SlotRequestId> releaseSlotFuture = new CompletableFuture<>();
@@ -317,20 +304,18 @@ public class SlotPoolRpcTest extends TestLogger {
 			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
 			pool.connectToResourceManager(resourceManagerGateway);
 
-			ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
-
 			// test the pending request is clear when timed out
 			CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot(
-				mockScheduledUnit,
+				new DummyScheduledUnit(),
 				true,
 				Collections.emptyList(),
-				Time.milliseconds(1L));
+				fastTimeout);
 
 			try {
 				future.get();
-				fail("We expected a AskTimeoutException.");
+				fail("We expected a TimeoutException.");
 			} catch (ExecutionException e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
 			}
 
 			// wait for the cancel call on the SlotPool
@@ -353,14 +338,13 @@ public class SlotPoolRpcTest extends TestLogger {
 				RpcService rpcService,
 				JobID jobId,
 				Clock clock,
-				Time slotRequestTimeout,
 				Time rpcTimeout,
 				Time idleSlotTimeout) {
 			super(
 				rpcService,
 				jobId,
 				clock,
-				rpcTimeout, slotRequestTimeout,
+				rpcTimeout,
 				idleSlotTimeout);
 
 			releaseSlotConsumer = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 6a85461..e6446ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -66,7 +66,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -587,7 +586,7 @@ public class SlotPoolTest extends TestLogger {
 			rpcService,
 			jobId,
 			clock,
-			TestingUtils.infiniteTime(), TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
 			timeout);
 
 		try {
@@ -628,37 +627,6 @@ public class SlotPoolTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that a slot allocation times out wrt to the specified time out.
-	 */
-	@Test
-	public void testSlotAllocationTimeout() throws Exception {
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
-		final Time allocationTimeout = Time.milliseconds(1L);
-
-		try {
-			setupSlotPool(slotPool, resourceManagerGateway);
-
-			final SlotProvider slotProvider = slotPool.getSlotProvider();
-
-			final CompletableFuture<LogicalSlot> allocationFuture = slotProvider.allocateSlot(
-				new DummyScheduledUnit(),
-				true,
-				Collections.emptyList(),
-				allocationTimeout);
-
-			try {
-				allocationFuture.get();
-				fail("Should have failed with a timeout exception.");
-			} catch (ExecutionException ee) {
-				assertThat(ExceptionUtils.stripExecutionException(ee), Matchers.instanceOf(TimeoutException.class));
-			}
-		} finally {
-			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
-		}
-	}
-
 	private static SlotPoolGateway setupSlotPool(
 			SlotPool slotPool,
 			ResourceManagerGateway resourceManagerGateway) throws Exception {


Mime
View raw message