flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-7387] [rpc] Require RpcEndpoints to directly implement RpcGateways
Date Fri, 11 Aug 2017 11:50:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9f790d3ef -> d95d20eb4


http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index a4f0e03..b5a3c80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -57,6 +57,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -74,6 +75,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Ignore;
@@ -89,6 +91,7 @@ import org.slf4j.Logger;
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -106,6 +109,8 @@ import static org.mockito.Mockito.*;
 
 public class TaskExecutorTest extends TestLogger {
 
+	private final Time timeout = Time.milliseconds(10000L);
+
 	@Rule
 	public TestName name = new TestName();
 
@@ -426,7 +431,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			verify(heartbeatManager, timeout(timeout)).monitorTarget(any(ResourceID.class), any(HeartbeatTarget.class));
 
-			TaskExecutorGateway taskExecutorGateway = taskManager.getSelf();
+			TaskExecutorGateway taskExecutorGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
 
 			// trigger the heartbeat asynchronously
 			taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
@@ -723,7 +728,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			taskManager.start();
 
-			taskManager.submitTask(tdd, jobManagerLeaderId);
+			taskManager.submitTask(tdd, jobManagerLeaderId, timeout);
 
 			CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture;
 
@@ -844,7 +849,7 @@ public class TaskExecutorTest extends TestLogger {
 			resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId);
 
 			// request slots from the task manager under the given allocation id
-			taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId);
+			taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId,
timeout);
 
 			// now inform the task manager about the new job leader
 			jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId);
@@ -925,7 +930,7 @@ public class TaskExecutorTest extends TestLogger {
 
 		when(jobMasterGateway.offerSlots(
 				any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture((Iterable<SlotOffer>)Collections.singleton(offer1)));
+			.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -1045,18 +1050,26 @@ public class TaskExecutorTest extends TestLogger {
 
 			// test that allocating a slot works
 			final SlotID slotID = new SlotID(resourceID, 0);
-			taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
+			taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId,
timeout);
 
 			// TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't
need the SlotID...
 			// test that we can't allocate slots which are blacklisted due to pending confirmation
of the RM
 			final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1);
 
+			CompletableFuture<Acknowledge> requestSlotFuture = taskManager.requestSlot(
+				unconfirmedFreeSlotID,
+				jobId,
+				new AllocationID(),
+				jobManagerAddress,
+				leaderId,
+				timeout);
+
 			try {
-				taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress,
leaderId);
+				requestSlotFuture.get();
 
 				fail("The slot request should have failed.");
-			} catch (SlotAllocationException e) {
-				// expected
+			} catch (Exception e) {
+				assertTrue(ExceptionUtils.containsThrowable(e, SlotAllocationException.class));
 			}
 
 			// re-register
@@ -1066,7 +1079,13 @@ public class TaskExecutorTest extends TestLogger {
 
 			// now we should be successful because the slots status has been synced
 			// test that we can't allocate slots which are blacklisted due to pending confirmation
of the RM
-			taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress,
leaderId);
+			taskManager.requestSlot(
+				unconfirmedFreeSlotID,
+				jobId,
+				new AllocationID(),
+				jobManagerAddress,
+				leaderId,
+				timeout);
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -1221,7 +1240,7 @@ public class TaskExecutorTest extends TestLogger {
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList());
 
-			CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new CompletableFuture<>();
+			CompletableFuture<Collection<SlotOffer>> offerResultFuture = new CompletableFuture<>();
 
 			// submit task first and then return acceptance response
 			when(
@@ -1239,7 +1258,7 @@ public class TaskExecutorTest extends TestLogger {
 			verify(jobMasterGateway).offerSlots(any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId),
any(Time.class));
 
 			// submit the task without having acknowledge the offered slots
-			taskManager.submitTask(tdd, jobManagerLeaderId);
+			taskManager.submitTask(tdd, jobManagerLeaderId, timeout);
 
 			// acknowledge the offered slots
 			offerResultFuture.complete(Collections.singleton(offer1));


Mime
View raw message