flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [hotfix] Treat taskManager's rpc address and location separately
Date Mon, 17 Oct 2016 13:38:02 GMT
Repository: flink
Updated Branches:
  refs/heads/flip-6 a19cae3b0 -> 4f891a6c2


[hotfix] Treat taskManager's rpc address and location separately


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e30eab9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e30eab9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e30eab9

Branch: refs/heads/flip-6
Commit: 7e30eab9174ca179809ade60c47d9af54a3717de
Parents: a19cae3
Author: Kurt Young <ykt836@gmail.com>
Authored: Mon Oct 17 09:38:46 2016 +0800
Committer: Kurt Young <ykt836@gmail.com>
Committed: Mon Oct 17 09:38:46 2016 +0800

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      |  7 ++--
 .../runtime/jobmaster/JobMasterGateway.java     |  8 +++--
 .../runtime/taskexecutor/JobLeaderService.java  | 37 +++++++++++++-------
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 14 ++++----
 5 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 8cb9946..306a28a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -680,13 +680,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public Future<RegistrationResponse> registerTaskManager(
+			final String taskManagerRpcAddress,
 			final TaskManagerLocation taskManagerLocation,
 			final UUID leaderId) throws Exception
 	{
 		if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
 			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
 							"leader session ID {} did not equal the received leader session ID {}.",
-					taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+					taskManagerLocation.getResourceID(), taskManagerRpcAddress,
 					JobMaster.this.leaderSessionID, leaderId);
 			throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID
 					+ ", actual: " + leaderId);
@@ -702,7 +703,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
 				@Override
 				public TaskExecutorGateway call() throws Exception {
-					return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class)
+					return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
 							.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
 				}
 			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>()
{
@@ -715,7 +716,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
 						log.warn("Discard registration from TaskExecutor {} at ({}) because the expected "
+
 										"leader session ID {} did not equal the received leader session ID {}.",
-								taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+								taskManagerId, taskManagerRpcAddress,
 								JobMaster.this.leaderSessionID, leaderId);
 						return new RegistrationResponse.Decline("Invalid leader session id");
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4c85839..4ee9f92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -196,12 +196,14 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway
{
 	/**
 	 * Register the task manager at the job manager.
 	 *
-	 * @param taskManagerLocation location of the task manager
-	 * @param leaderId            identifying the job leader
-	 * @param timeout             for the rpc call
+	 * @param taskManagerRpcAddress the rpc address of the task manager
+	 * @param taskManagerLocation   location of the task manager
+	 * @param leaderId              identifying the job leader
+	 * @param timeout               for the rpc call
 	 * @return Future registration response indicating whether the registration was successful
or not
 	 */
 	Future<RegistrationResponse> registerTaskManager(
+			final String taskManagerRpcAddress,
 			final TaskManagerLocation taskManagerLocation,
 			final UUID leaderId,
 			@RpcTimeout final Time timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 14d36ab..93c7bb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -62,6 +62,9 @@ public class JobLeaderService {
 	/** Internal state of the service */
 	private volatile JobLeaderService.State state;
 
+	/** Address of the owner of this service. This address is used for the job manager connection
*/
+	private String ownerAddress;
+
 	/** Rpc service to use for establishing connections */
 	private RpcService rpcService;
 
@@ -78,6 +81,7 @@ public class JobLeaderService {
 
 		state = JobLeaderService.State.CREATED;
 
+		ownerAddress = null;
 		rpcService = null;
 		highAvailabilityServices = null;
 		jobLeaderListener = null;
@@ -90,20 +94,23 @@ public class JobLeaderService {
 	/**
 	 * Start the job leader service with the given services.
 	 *
+ 	 * @param initialOwnerAddress to be used for establishing connections (source address)
 	 * @param initialRpcService to be used to create rpc connections
 	 * @param initialHighAvailabilityServices to create leader retrieval services for the different
jobs
 	 * @param initialJobLeaderListener listening for job leader changes
 	 */
 	public void start(
-		final RpcService initialRpcService,
-		final HighAvailabilityServices initialHighAvailabilityServices,
-		final JobLeaderListener initialJobLeaderListener) {
+			final String initialOwnerAddress,
+			final RpcService initialRpcService,
+			final HighAvailabilityServices initialHighAvailabilityServices,
+			final JobLeaderListener initialJobLeaderListener) {
 
 		if (JobLeaderService.State.CREATED != state) {
 			throw new IllegalStateException("The service has already been started.");
 		} else {
 			LOG.info("Start job leader service.");
 
+			this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress);
 			this.rpcService = Preconditions.checkNotNull(initialRpcService);
 			this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices);
 			this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener);
@@ -310,6 +317,7 @@ public class JobLeaderService {
 						JobMasterGateway.class,
 						getTargetAddress(),
 						getTargetLeaderId(),
+						ownerAddress,
 						ownLocation);
 			}
 
@@ -345,19 +353,23 @@ public class JobLeaderService {
 			extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess>
 	{
 
+		private final String taskManagerRpcAddress;
+
 		private final TaskManagerLocation taskManagerLocation;
 
 		JobManagerRetryingRegistration(
-			Logger log,
-			RpcService rpcService,
-			String targetName,
-			Class<JobMasterGateway> targetType,
-			String targetAddress,
-			UUID leaderId,
-			TaskManagerLocation taskManagerLocation) {
-
+				Logger log,
+				RpcService rpcService,
+				String targetName,
+				Class<JobMasterGateway> targetType,
+				String targetAddress,
+				UUID leaderId,
+				String taskManagerRpcAddress,
+				TaskManagerLocation taskManagerLocation)
+		{
 			super(log, rpcService, targetName, targetType, targetAddress, leaderId);
 
+			this.taskManagerRpcAddress = taskManagerRpcAddress;
 			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		}
 
@@ -365,7 +377,8 @@ public class JobLeaderService {
 		protected Future<RegistrationResponse> invokeRegistration(
 				JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
 		{
-			return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis));
+			return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
+					leaderId, Time.milliseconds(timeoutMillis));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 1201281..3e3a544 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -206,7 +206,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 		taskSlotTable.start(new SlotActionsImpl());
 
 		// start the job leader service
-		jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl());
+		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7e30eab9/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2220f12..2b5d2dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -379,9 +379,10 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
-			eq(taskManagerLocation),
-			eq(jobManagerLeaderId),
-			any(Time.class)
+				any(String.class),
+				eq(taskManagerLocation),
+				eq(jobManagerLeaderId),
+				any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
 
@@ -483,9 +484,10 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
-			eq(taskManagerLocation),
-			eq(jobManagerLeaderId),
-			any(Time.class)
+				any(String.class),
+				eq(taskManagerLocation),
+				eq(jobManagerLeaderId),
+				any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
 


Mime
View raw message