hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [4/6] hbase git commit: HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Date Mon, 26 Oct 2015 22:11:31 GMT
HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b7560581
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b7560581
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b7560581

Branch: refs/heads/branch-1.1
Commit: b7560581752b477df6f824382461d5fb8fc41fd2
Parents: 9b71dac
Author: Andrew Purtell <apurtell@apache.org>
Authored: Mon Oct 26 14:31:28 2015 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Mon Oct 26 15:11:10 2015 -0700

----------------------------------------------------------------------
 .../hbase/ipc/IntegrationTestRpcClient.java     | 22 ++++++++++++++++----
 .../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++----
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 17 ++++++++++-----
 .../hbase/regionserver/RSRpcServices.java       |  6 +++++-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       | 22 +++++++++++++++-----
 .../apache/hadoop/hbase/ipc/TestAsyncIPC.java   |  9 ++++++++
 .../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------
 .../org/apache/hadoop/hbase/ipc/TestIPC.java    |  5 ++++-
 .../hadoop/hbase/ipc/TestProtoBufRpc.java       |  6 +++++-
 .../hbase/ipc/TestRpcHandlerException.java      |  7 +++++--
 .../TestRSKilledWhenInitializing.java           |  8 +++++--
 .../hadoop/hbase/security/TestSecureRPC.java    |  9 +++++---
 .../security/token/TestTokenAuthentication.java |  6 +++++-
 13 files changed, 114 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index a99df88..1b425b8 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -168,9 +168,13 @@ public class IntegrationTestRpcClient {
 
         TestRpcServer rpcServer = new TestRpcServer(conf);
         rpcServer.start();
-        rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+        InetSocketAddress address = rpcServer.getListenerAddress();        
+        if (address == null) {
+          throw new IOException("Listener channel is closed");
+        }
+        rpcServers.put(address, rpcServer);
         serverList.add(rpcServer);
-        LOG.info("Started server: " + rpcServer.getListenerAddress());
+        LOG.info("Started server: " + address);
         return rpcServer;
       } finally {
         lock.writeLock().unlock();
@@ -187,7 +191,13 @@ public class IntegrationTestRpcClient {
         int size = rpcServers.size();
         int rand = random.nextInt(size);
         rpcServer = serverList.remove(rand);
-        rpcServers.remove(rpcServer.getListenerAddress());
+        InetSocketAddress address = rpcServer.getListenerAddress();
+        if (address == null) {
+          // Throw exception here. We can't remove this instance from the server map because
+          // we no longer have access to its map key
+          throw new IOException("Listener channel is closed");
+        }
+        rpcServers.remove(address);
 
         if (rpcServer != null) {
           stopServer(rpcServer);
@@ -305,8 +315,12 @@ public class IntegrationTestRpcClient {
         TestRpcServer server = cluster.getRandomServer();
         try {
           User user = User.getCurrent();
+          InetSocketAddress address = server.getListenerAddress();
+          if (address == null) {
+            throw new IOException("Listener channel is closed");
+          }
           ret = (EchoResponseProto)
-              rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
+              rpcClient.callBlockingMethod(md, null, param, ret, user, address);
         } catch (Exception e) {
           LOG.warn(e);
           continue; // expected in case connection is closing or closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index e2274e9..e329ef0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.hbase.CellScanner;
@@ -91,8 +92,9 @@ public class CallRunner {
       TraceScope traceScope = null;
       try {
         if (!this.rpcServer.isStarted()) {
-          throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress()
-              + " is not running yet");
+          InetSocketAddress address = rpcServer.getListenerAddress();
+          throw new ServerNotRunningYetException("Server " +
+              (address != null ? address : "(channel closed)") + " is not running yet");
         }
         if (call.tinfo != null) {
           traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
@@ -134,9 +136,10 @@ public class CallRunner {
         throw e;
       }
     } catch (ClosedChannelException cce) {
+      InetSocketAddress address = rpcServer.getListenerAddress();
       RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException,
" +
-          "this means that the server " + rpcServer.getListenerAddress() + " was processing
a " +
-          "request but the client went away. The error message was: " +
+          "this means that the server " + (address != null ? address : "(channel closed)")
+
+          " was processing a request but the client went away. The error message was: " +
           cce.getMessage());
     } catch (Exception e) {
       RpcServer.LOG.warn(Thread.currentThread().getName()

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 47d9825..e3ec22f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1791,8 +1791,9 @@ public class RpcServer implements RpcServerInterface {
             responder, totalRequestSize, null, null);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+        InetSocketAddress address = getListenerAddress();
         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
-            "Call queue is full on " + getListenerAddress() +
+            "Call queue is full on " + (address != null ? address : "(channel closed)") +
                 ", is hbase.ipc.server.max.callqueue.size too small?");
         responder.doRespond(callTooBig);
         return;
@@ -1820,8 +1821,9 @@ public class RpcServer implements RpcServerInterface {
             buf, offset, buf.length);
         }
       } catch (Throwable t) {
-        String msg = getListenerAddress() + " is unable to read call parameter from client
" +
-            getHostAddress();
+        InetSocketAddress address = getListenerAddress();
+        String msg = (address != null ? address : "(channel closed)") +
+            " is unable to read call parameter from client " + getHostAddress();
         LOG.warn(msg, t);
 
         metrics.exception(t);
@@ -2241,11 +2243,16 @@ public class RpcServer implements RpcServerInterface {
   }
 
   /**
-   * Return the socket (ip+port) on which the RPC server is listening to.
-   * @return the socket (ip+port) on which the RPC server is listening to.
+   * Return the socket (ip+port) on which the RPC server is listening to. May return null
if
+   * the listener channel is closed.
+   * @return the socket (ip+port) on which the RPC server is listening to, or null if this
+   * information cannot be determined
    */
   @Override
   public synchronized InetSocketAddress getListenerAddress() {
+    if (listener == null) {
+      return null;
+    }
     return listener.getAddress();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 61fb87b..ec4f88e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -878,8 +878,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
       DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
 
+    InetSocketAddress address = rpcServer.getListenerAddress();
+    if (address == null) {
+      throw new IOException("Listener channel is closed");
+    }
     // Set our address, however we need the final port that was given to rpcServer
-    isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
+    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
     rpcServer.setErrorHandler(this);
     rs.setName(name);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 32eb9f6..528939d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -158,10 +158,13 @@ public abstract class AbstractTestIPC {
     TestRpcServer rpcServer = new TestRpcServer();
     try {
       rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       final String message = "hello";
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       Pair<Message, CellScanner> r =
           client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
       assertTrue(r.getSecond() == null);
@@ -198,12 +201,14 @@ public abstract class AbstractTestIPC {
     TestRpcServer rpcServer = new TestRpcServer();
     try {
       rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
       PayloadCarryingRpcController pcrc =
           new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       Pair<Message, CellScanner> r =
           client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
       int index = 0;
@@ -228,9 +233,12 @@ public abstract class AbstractTestIPC {
     AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
     try {
       rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       client.call(null, md, param, null, User.getCurrent(), address);
       fail("Expected an exception to have been thrown!");
     } catch (Exception e) {
@@ -254,11 +262,15 @@ public abstract class AbstractTestIPC {
       verify(scheduler).start();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       for (int i = 0; i < 10; i++) {
         client.call(
           new PayloadCarryingRpcController(
               CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
md
-              .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress());
+              .getOutputType().toProto(), User.getCurrent(), address);
       }
       verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index ca7c9a7..891acc3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -157,6 +157,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
     try {
       rpcServer.start();
       InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
 
@@ -193,6 +196,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
     try {
       rpcServer.start();
       InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
 
@@ -258,6 +264,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
     try {
       rpcServer.start();
       InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       long startTime = System.currentTimeMillis();
       User user = User.getCurrent();
       for (int i = 0; i < cycles; i++) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
index deee717..41ee4cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
@@ -91,9 +91,12 @@ public class TestDelayedRpc {
     RpcClient rpcClient = RpcClientFactory.createClient(
         conf, HConstants.DEFAULT_CLUSTER_ID.toString());
     try {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-          ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
-              rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
           User.getCurrent(), RPC_CLIENT_TIMEOUT);
       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
         TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -173,9 +176,12 @@ public class TestDelayedRpc {
     RpcClient rpcClient = RpcClientFactory.createClient(
         conf, HConstants.DEFAULT_CLUSTER_ID.toString());
     try {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-          ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
-              rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
           User.getCurrent(), RPC_CLIENT_TIMEOUT);
       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
         TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -297,9 +303,12 @@ public class TestDelayedRpc {
     RpcClient rpcClient = RpcClientFactory.createClient(
         conf, HConstants.DEFAULT_CLUSTER_ID.toString());
     try {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-          ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
-              rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
         User.getCurrent(), 1000);
       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
         TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index af10058..6975c6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -124,9 +124,12 @@ public class TestIPC extends AbstractTestIPC {
     rm.add(p);
     try {
       rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
       long startTime = System.currentTimeMillis();
       User user = User.getCurrent();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       for (int i = 0; i < cycles; i++) {
         List<CellScannable> cells = new ArrayList<CellScannable>();
         // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY,
rm);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index fc2734f..ffb3927 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -100,7 +100,11 @@ public class TestProtoBufRpc {
         Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
         new InetSocketAddress(ADDRESS, PORT), conf,
         new FifoRpcScheduler(conf, 10));
-    this.isa = server.getListenerAddress();
+    InetSocketAddress address = server.getListenerAddress();
+    if (address == null) {
+      throw new IOException("Listener channel is closed");
+    }
+    this.isa = address;
     this.server.start();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index 2c21ebd..c72313d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -178,9 +178,12 @@ public class TestRpcHandlerException {
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
       PayloadCarryingRpcController controller =
           new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
-      
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
-          rpcServer.getListenerAddress());
+          address);
     } catch (Throwable e) {
       assert(abortable.isAborted() == true);
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
index 4ad2c31..9a48db7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -114,13 +115,16 @@ public class TestRSKilledWhenInitializing {
     @Override
     protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException
{
       if (firstRS.getAndSet(false)) {
+        InetSocketAddress address = super.getRpcServer().getListenerAddress();
+        if (address == null) {
+          throw new IOException("Listener channel is closed");
+        }
         for (NameStringPair e : c.getMapEntriesList()) {
           String key = e.getName();
           // The hostname the master sees us as.
           if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
             String hostnameFromMasterPOV = e.getValue();
-            assertEquals(super.getRpcServer().getListenerAddress().getHostName(),
-              hostnameFromMasterPOV);
+            assertEquals(address.getHostName(), hostnameFromMasterPOV);
           }
         }
         while (!masterActive) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
index b4dd62b..a940408 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
@@ -142,11 +142,14 @@ public class TestSecureRPC {
     RpcClient rpcClient =
         RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
     try {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
       BlockingRpcChannel channel =
           rpcClient.createBlockingRpcChannel(
-            ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer
-                .getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(),
-            5000);
+            ServerName.valueOf(address.getHostName(), address.getPort(),
+            System.currentTimeMillis()), User.getCurrent(), 5000);
       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
           TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
       List<Integer> results = new ArrayList<Integer>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index b0eb3aa..e068a08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -141,7 +141,11 @@ public class TestTokenAuthentication {
         AuthenticationProtos.AuthenticationService.BlockingInterface.class));
       this.rpcServer =
         new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf,
1));
-      this.isa = this.rpcServer.getListenerAddress();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      this.isa = address;
       this.sleeper = new Sleeper(1000, this);
     }
 


Mime
View raw message