Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 60052200B92 for ; Wed, 28 Sep 2016 10:20:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5EA43160AB4; Wed, 28 Sep 2016 08:20:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 95328160AEA for ; Wed, 28 Sep 2016 10:20:47 +0200 (CEST) Received: (qmail 94729 invoked by uid 500); 28 Sep 2016 08:20:46 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 92272 invoked by uid 99); 28 Sep 2016 08:20:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Sep 2016 08:20:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2DB5DE0BD9; Wed, 28 Sep 2016 08:20:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Wed, 28 Sep 2016 08:21:33 -0000 Message-Id: <4879f2a4dcbb401b85303cc8a07abb30@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] flink git commit: [FLINK-4530] [rpc] Generalize TaskExecutorToResourceManagerConnection to be reusable archived-at: Wed, 28 Sep 2016 08:20:50 -0000 [FLINK-4530] [rpc] Generalize TaskExecutorToResourceManagerConnection to be reusable This closes #2520 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b955465f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b955465f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b955465f Branch: refs/heads/flip-6 Commit: b955465ff2f230da0ecd195d7d0e8312fdf0578e Parents: b8419ec Author: zhuhaifengleon Authored: Mon Sep 26 17:43:44 2016 +0800 Committer: Till Rohrmann Committed: Tue Sep 27 19:25:02 2016 +0200 ---------------------------------------------------------------------- .../JobMasterToResourceManagerConnection.java | 117 +++++++++++ .../registration/RegisteredRpcConnection.java | 192 +++++++++++++++++++ .../runtime/taskexecutor/TaskExecutor.java | 4 +- ...TaskExecutorToResourceManagerConnection.java | 127 +++--------- .../RegisteredRpcConnectionTest.java | 183 ++++++++++++++++++ .../registration/RetryingRegistrationTest.java | 6 +- 6 files changed, 519 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b955465f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java new file mode 100644 index 0000000..71fce8c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.registration.RegisteredRpcConnection; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.registration.RetryingRegistration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.concurrent.Future; + +import org.slf4j.Logger; + +import java.util.UUID; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The connection between a JobMaster and the ResourceManager. + */ +public class JobMasterToResourceManagerConnection + extends RegisteredRpcConnection { + + /** the JobMaster whose connection to the ResourceManager this represents */ + private final JobMaster jobMaster; + + private final JobID jobID; + + private final UUID jobMasterLeaderId; + + public JobMasterToResourceManagerConnection( + Logger log, + JobID jobID, + JobMaster jobMaster, + UUID jobMasterLeaderId, + String resourceManagerAddress, + UUID resourceManagerLeaderId, + Executor executor) { + + super(log, resourceManagerAddress, resourceManagerLeaderId, executor); + this.jobMaster = checkNotNull(jobMaster); + this.jobID = checkNotNull(jobID); + this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId); + } + + @Override + protected RetryingRegistration generateRegistration() { + return new JobMasterToResourceManagerConnection.ResourceManagerRegistration( + log, jobMaster.getRpcService(), + getTargetAddress(), getTargetLeaderId(), + jobMaster.getAddress(),jobID, jobMasterLeaderId); + } + + @Override + protected void onRegistrationSuccess(JobMasterRegistrationSuccess success) { + } + + @Override + protected void onRegistrationFailure(Throwable failure) { + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static class ResourceManagerRegistration + extends RetryingRegistration { + + private final String jobMasterAddress; + + private final JobID jobID; + + private final UUID jobMasterLeaderId; + + ResourceManagerRegistration( + Logger log, + RpcService rpcService, + String targetAddress, + UUID leaderId, + String jobMasterAddress, + JobID jobID, + UUID jobMasterLeaderId) { + + super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId); + this.jobMasterAddress = checkNotNull(jobMasterAddress); + this.jobID = checkNotNull(jobID); + this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId); + } + + @Override + protected Future invokeRegistration( + ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { + + Time timeout = Time.milliseconds(timeoutMillis); + return gateway.registerJobMaster(leaderId, jobMasterLeaderId,jobMasterAddress, jobID, timeout); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b955465f/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java new file mode 100644 index 0000000..76093b0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.registration; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; + +import org.slf4j.Logger; + +import java.util.UUID; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This utility class implements the basis of RPC connecting from one component to another component, + * for example the RPC connection from TaskExecutor to ResourceManager. + * This {@code RegisteredRpcConnection} implements registration and get target gateway . + * + *

The registration gives access to a future that is completed upon successful registration. + * The RPC connection can be closed, for example when the target where it tries to register + * at looses leader status. + * + * @param The type of the gateway to connect to. + * @param The type of the successful registration responses. + */ +public abstract class RegisteredRpcConnection { + + /** the logger for all log messages of this class */ + protected final Logger log; + + /** the target component leaderID, for example the ResourceManager leaderID */ + private final UUID targetLeaderId; + + /** the target component Address, for example the ResourceManager Address */ + private final String targetAddress; + + /** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */ + private final Executor executor; + + /** the Registration of this RPC connection */ + private RetryingRegistration pendingRegistration; + + /** the gateway to register, it's null until the registration is completed */ + private volatile Gateway targetGateway; + + /** flag indicating that the RPC connection is closed */ + private volatile boolean closed; + + // ------------------------------------------------------------------------ + + public RegisteredRpcConnection( + Logger log, + String targetAddress, + UUID targetLeaderId, + Executor executor) + { + this.log = checkNotNull(log); + this.targetAddress = checkNotNull(targetAddress); + this.targetLeaderId = checkNotNull(targetLeaderId); + this.executor = checkNotNull(executor); + } + + // ------------------------------------------------------------------------ + // Life cycle + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + public void start() { + checkState(!closed, "The RPC connection is already closed"); + checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started"); + + pendingRegistration = checkNotNull(generateRegistration()); + pendingRegistration.startRegistration(); + + Future> future = pendingRegistration.getFuture(); + + future.thenAcceptAsync(new AcceptFunction>() { + @Override + public void accept(Tuple2 result) { + targetGateway = result.f0; + onRegistrationSuccess(result.f1); + } + }, executor); + + // this future should only ever fail if there is a bug, not if the registration is declined + future.exceptionallyAsync(new ApplyFunction() { + @Override + public Void apply(Throwable failure) { + onRegistrationFailure(failure); + return null; + } + }, executor); + } + + /** + * This method generate a specific Registration, for example TaskExecutor Registration at the ResourceManager + */ + protected abstract RetryingRegistration generateRegistration(); + + /** + * This method handle the Registration Response + */ + protected abstract void onRegistrationSuccess(Success success); + + /** + * This method handle the Registration failure + */ + protected abstract void onRegistrationFailure(Throwable failure); + + /** + * close connection + */ + public void close() { + closed = true; + + // make sure we do not keep re-trying forever + if (pendingRegistration != null) { + pendingRegistration.cancel(); + } + } + + public boolean isClosed() { + return closed; + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + public UUID getTargetLeaderId() { + return targetLeaderId; + } + + public String getTargetAddress() { + return targetAddress; + } + + /** + * Gets the RegisteredGateway. This returns null until the registration is completed. + */ + public Gateway getTargetGateway() { + return targetGateway; + } + + public boolean isConnected() { + return targetGateway != null; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + String connectionInfo = "(ADDRESS: " + targetAddress + " LEADERID: " + targetLeaderId + ")"; + + if (isConnected()) { + connectionInfo = "RPC connection to " + targetGateway.getClass().getSimpleName() + " " + connectionInfo; + } else { + connectionInfo = "RPC connection to " + connectionInfo; + } + + if (isClosed()) { + connectionInfo = connectionInfo + " is closed"; + } else if (isConnected()){ + connectionInfo = connectionInfo + " is established"; + } else { + connectionInfo = connectionInfo + " is connecting"; + } + + return connectionInfo; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b955465f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 9e3c3b9..9d9ad2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -178,12 +178,12 @@ public class TaskExecutor extends RpcEndpoint { if (newLeaderAddress != null) { // the resource manager switched to a new leader log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); + resourceManagerConnection.getTargetAddress(), newLeaderAddress); } else { // address null means that the current leader is lost without a new leader being there, yet log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getResourceManagerAddress()); + resourceManagerConnection.getTargetAddress()); } // drop the current connection or connection attempt http://git-wip-us.apache.org/repos/asf/flink/blob/b955465f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 647359d..b4b3bae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -19,16 +19,14 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.AcceptFunction; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.concurrent.Future; import org.slf4j.Logger; @@ -36,115 +34,46 @@ import java.util.UUID; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * The connection between a TaskExecutor and the ResourceManager. */ -public class TaskExecutorToResourceManagerConnection { - - /** the logger for all log messages of this class */ - private final Logger log; +public class TaskExecutorToResourceManagerConnection + extends RegisteredRpcConnection { /** the TaskExecutor whose connection to the ResourceManager this represents */ private final TaskExecutor taskExecutor; - private final UUID resourceManagerLeaderId; - - private final String resourceManagerAddress; - - /** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */ - private final Executor executor; - - private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration; - - private volatile ResourceManagerGateway registeredResourceManager; - private InstanceID registrationId; - /** flag indicating that the connection is closed */ - private volatile boolean closed; - - public TaskExecutorToResourceManagerConnection( - Logger log, - TaskExecutor taskExecutor, - String resourceManagerAddress, - UUID resourceManagerLeaderId, - Executor executor) { + Logger log, + TaskExecutor taskExecutor, + String resourceManagerAddress, + UUID resourceManagerLeaderId, + Executor executor) { - this.log = checkNotNull(log); + super(log, resourceManagerAddress, resourceManagerLeaderId, executor); this.taskExecutor = checkNotNull(taskExecutor); - this.resourceManagerAddress = checkNotNull(resourceManagerAddress); - this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); - this.executor = checkNotNull(executor); - } - - // ------------------------------------------------------------------------ - // Life cycle - // ------------------------------------------------------------------------ - - @SuppressWarnings("unchecked") - public void start() { - checkState(!closed, "The connection is already closed"); - checkState(!isRegistered() && pendingRegistration == null, "The connection is already started"); - - pendingRegistration = new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( - log, taskExecutor.getRpcService(), - resourceManagerAddress, resourceManagerLeaderId, - taskExecutor.getAddress(), taskExecutor.getResourceID()); - pendingRegistration.startRegistration(); - - Future> future = pendingRegistration.getFuture(); - - future.thenAcceptAsync(new AcceptFunction>() { - @Override - public void accept(Tuple2 result) { - registrationId = result.f1.getRegistrationId(); - registeredResourceManager = result.f0; - } - }, executor); - - // this future should only ever fail if there is a bug, not if the registration is declined - future.exceptionallyAsync(new ApplyFunction() { - @Override - public Void apply(Throwable failure) { - taskExecutor.onFatalErrorAsync(failure); - return null; - } - }, executor); - } - - public void close() { - closed = true; - - // make sure we do not keep re-trying forever - if (pendingRegistration != null) { - pendingRegistration.cancel(); - } } - public boolean isClosed() { - return closed; - } - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - public UUID getResourceManagerLeaderId() { - return resourceManagerLeaderId; + @Override + protected RetryingRegistration generateRegistration() { + return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( + log, taskExecutor.getRpcService(), + getTargetAddress(), getTargetLeaderId(), + taskExecutor.getAddress(),taskExecutor.getResourceID()); } - public String getResourceManagerAddress() { - return resourceManagerAddress; + @Override + protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) { + registrationId = success.getRegistrationId(); } - /** - * Gets the ResourceManagerGateway. This returns null until the registration is completed. - */ - public ResourceManagerGateway getResourceManager() { - return registeredResourceManager; + @Override + protected void onRegistrationFailure(Throwable failure) { + taskExecutor.onFatalErrorAsync(failure); } /** @@ -155,18 +84,6 @@ public class TaskExecutorToResourceManagerConnection { return registrationId; } - public boolean isRegistered() { - return registeredResourceManager != null; - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return String.format("Connection to ResourceManager %s (leaderId=%s)", - resourceManagerAddress, resourceManagerLeaderId); - } - // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/b955465f/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java new file mode 100644 index 0000000..8558205 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.registration; + +import org.apache.flink.runtime.registration.RetryingRegistrationTest.TestRegistrationSuccess; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.util.UUID; +import java.util.concurrent.Executor; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for RegisteredRpcConnection, validating the successful, failure and close behavior. + */ +public class RegisteredRpcConnectionTest extends TestLogger { + + @Test + public void testSuccessfulRpcConnection() throws Exception { + final String testRpcConnectionEndpointAddress = ""; + final UUID leaderId = UUID.randomUUID(); + final String connectionID = "Test RPC Connection ID"; + + // an endpoint that immediately returns success + TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID)); + TestingRpcService rpcService = new TestingRpcService(); + + try { + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); + + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); + + //wait for connection established + Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT); + + // validate correct invocation and result + assertTrue(connection.isConnected()); + assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); + assertEquals(leaderId, connection.getTargetLeaderId()); + assertEquals(testGateway, connection.getTargetGateway()); + assertEquals(connectionID, connection.getConnectionId()); + } + finally { + testGateway.stop(); + rpcService.stopService(); + } + } + + @Test + public void testRpcConnectionFailures() throws Exception { + final String connectionFailureMessage = "Test RPC Connection failure"; + final String testRpcConnectionEndpointAddress = ""; + final UUID leaderId = UUID.randomUUID(); + + TestingRpcService rpcService = new TestingRpcService(); + + try { + // gateway that upon calls Throw an exception + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow( + new RuntimeException(connectionFailureMessage)); + + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); + + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); + + //wait for connection failure + Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT); + + // validate correct invocation and result + assertFalse(connection.isConnected()); + assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); + assertEquals(leaderId, connection.getTargetLeaderId()); + assertNull(connection.getTargetGateway()); + assertEquals(connectionFailureMessage, connection.getFailareMessage()); + } + finally { + rpcService.stopService(); + } + } + + @Test + public void testRpcConnectionClose() throws Exception { + final String testRpcConnectionEndpointAddress = ""; + final UUID leaderId = UUID.randomUUID(); + final String connectionID = "Test RPC Connection ID"; + + TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID)); + TestingRpcService rpcService = new TestingRpcService(); + + try{ + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); + + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); + //close the connection + connection.close(); + + // validate connection is closed + assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); + assertEquals(leaderId, connection.getTargetLeaderId()); + assertTrue(connection.isClosed()); + } + finally { + testGateway.stop(); + rpcService.stopService(); + } + } + + // ------------------------------------------------------------------------ + // test RegisteredRpcConnection + // ------------------------------------------------------------------------ + + private static class TestRpcConnection extends RegisteredRpcConnection { + + private final RpcService rpcService; + + private String connectionId; + + private String failureMessage; + + public TestRpcConnection(String targetAddress, + UUID targetLeaderId, + Executor executor, + RpcService rpcService) + { + super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor); + this.rpcService = rpcService; + } + + @Override + protected RetryingRegistration generateRegistration() { + return new RetryingRegistrationTest.TestRetryingRegistration(rpcService, getTargetAddress(), getTargetLeaderId()); + } + + @Override + protected void onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess success) { + connectionId = success.getCorrelationId(); + } + + @Override + protected void onRegistrationFailure(Throwable failure) { + failureMessage = failure.getMessage(); + } + + public String getConnectionId() { + return connectionId; + } + + public String getFailareMessage() { + return failureMessage; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b955465f/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index e56a9ec..6d6bbef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -298,12 +298,12 @@ public class RetryingRegistrationTest extends TestLogger { // test registration // ------------------------------------------------------------------------ - private static class TestRegistrationSuccess extends RegistrationResponse.Success { + protected static class TestRegistrationSuccess extends RegistrationResponse.Success { private static final long serialVersionUID = 5542698790917150604L; private final String correlationId; - private TestRegistrationSuccess(String correlationId) { + public TestRegistrationSuccess(String correlationId) { this.correlationId = correlationId; } @@ -312,7 +312,7 @@ public class RetryingRegistrationTest extends TestLogger { } } - private static class TestRetryingRegistration extends RetryingRegistration { + protected static class TestRetryingRegistration extends RetryingRegistration { // we use shorter timeouts here to speed up the tests static final long INITIAL_TIMEOUT = 20;