hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [03/10] hbase git commit: HBASE-16584 Backport the new ipc implementation in HBASE-16432 to branch-1
Date Thu, 16 Mar 2017 15:00:56 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index bbf8720..3c3d06d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -17,7 +17,13 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyObject;
@@ -25,11 +31,14 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -39,36 +48,22 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.StringUtils;
-import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 /**
  * Some basic ipc tests.
  */
@@ -76,59 +71,11 @@ public abstract class AbstractTestIPC {
 
   private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
 
-  private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
-  private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
+  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
+  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
   static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
   static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
   static final Configuration CONF = HBaseConfiguration.create();
-  // We are using the test TestRpcServiceProtos generated classes and Service because they are
-  // available and basic with methods like 'echo', and ping. Below we make a blocking service
-  // by passing in implementation of blocking interface. We use this service in all tests that
-  // follow.
-  static final BlockingService SERVICE =
-      TestRpcServiceProtos.TestProtobufRpcProto
-          .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
-
-            @Override
-            public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
-                throws ServiceException {
-              return null;
-            }
-
-            @Override
-            public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
-                throws ServiceException {
-              return null;
-            }
-
-            @Override
-            public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
-                throws ServiceException {
-              if (controller instanceof PayloadCarryingRpcController) {
-                PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
-                // If cells, scan them to check we are able to iterate what we were given and since
-                // this is
-                // an echo, just put them back on the controller creating a new block. Tests our
-                // block
-                // building.
-                CellScanner cellScanner = pcrc.cellScanner();
-                List<Cell> list = null;
-                if (cellScanner != null) {
-                  list = new ArrayList<Cell>();
-                  try {
-                    while (cellScanner.advance()) {
-                      list.add(cellScanner.current());
-                    }
-                  } catch (IOException e) {
-                    throw new ServiceException(e);
-                  }
-                }
-                cellScanner = CellUtil.createCellScanner(list);
-                ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
-              }
-              return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
-            }
-          });
 
   /**
    * Instance of server. We actually don't do anything speical in here so could just use
@@ -145,149 +92,106 @@ public abstract class AbstractTestIPC {
     }
 
     TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
-      super(null, "testRpcServer", Lists
-          .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
-          "localhost", 0), conf, scheduler);
-    }
-
-    @Override
-    public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
-        Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
-        throws IOException {
-      return super.call(service, md, param, cellScanner, receiveTime, status);
+      super(null, "testRpcServer",
+          Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
+          new InetSocketAddress("localhost", 0), conf, scheduler);
     }
   }
 
-  protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
+  protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
 
   /**
    * Ensure we do not HAVE TO HAVE a codec.
-   * @throws InterruptedException
-   * @throws IOException
    */
   @Test
-  public void testNoCodec() throws InterruptedException, IOException {
+  public void testNoCodec() throws IOException, ServiceException {
     Configuration conf = HBaseConfiguration.create();
-    AbstractRpcClient client = createRpcClientNoCodec(conf);
     TestRpcServer rpcServer = new TestRpcServer();
-    try {
+    try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) {
       rpcServer.start();
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-      final String message = "hello";
-      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      Pair<Message, CellScanner> r =
-          client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
-              new MetricsConnection.CallStats());
-      assertTrue(r.getSecond() == null);
-      // Silly assertion that the message is in the returned pb.
-      assertTrue(r.getFirst().toString().contains(message));
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+      String message = "hello";
+      assertEquals(message,
+        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
+      assertNull(pcrc.cellScanner());
     } finally {
-      client.close();
       rpcServer.stop();
     }
   }
 
-  protected abstract AbstractRpcClient createRpcClient(Configuration conf);
+  protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf);
 
   /**
    * It is hard to verify the compression is actually happening under the wraps. Hope that if
    * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
    * confirm that compression is happening down in the client and server).
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws SecurityException
-   * @throws NoSuchMethodException
    */
   @Test
-  public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException,
-      NoSuchMethodException, ServiceException {
+  public void testCompressCellBlock() throws IOException, ServiceException {
     Configuration conf = new Configuration(HBaseConfiguration.create());
-    conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
-    List<Cell> cells = new ArrayList<Cell>();
+   // conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
+    List<Cell> cells = new ArrayList<>();
     int count = 3;
     for (int i = 0; i < count; i++) {
       cells.add(CELL);
     }
-    AbstractRpcClient client = createRpcClient(conf);
     TestRpcServer rpcServer = new TestRpcServer();
-    try {
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
       rpcServer.start();
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      PayloadCarryingRpcController pcrc =
-          new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      Pair<Message, CellScanner> r =
-          client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
-              new MetricsConnection.CallStats());
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+      HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
+      String message = "hello";
+      assertEquals(message,
+        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
       int index = 0;
-      while (r.getSecond().advance()) {
-        assertTrue(CELL.equals(r.getSecond().current()));
+      CellScanner cellScanner = pcrc.cellScanner();
+      assertNotNull(cellScanner);
+      while (cellScanner.advance()) {
+        assertEquals(CELL, cellScanner.current());
         index++;
       }
       assertEquals(count, index);
     } finally {
-      client.close();
       rpcServer.stop();
     }
   }
 
-  protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
-      throws IOException;
+  protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(
+      Configuration conf) throws IOException;
 
   @Test
   public void testRTEDuringConnectionSetup() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     TestRpcServer rpcServer = new TestRpcServer();
-    AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
-    try {
+    try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) {
       rpcServer.start();
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      client.call(null, md, param, null, User.getCurrent(), address,
-          new MetricsConnection.CallStats());
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+      stub.ping(null, EmptyRequestProto.getDefaultInstance());
       fail("Expected an exception to have been thrown!");
     } catch (Exception e) {
       LOG.info("Caught expected exception: " + e.toString());
-      assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
+      assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
     } finally {
-      client.close();
       rpcServer.stop();
     }
   }
 
-  /** Tests that the rpc scheduler is called when requests arrive. */
+  /**
+   * Tests that the rpc scheduler is called when requests arrive.
+   */
   @Test
-  public void testRpcScheduler() throws IOException, InterruptedException {
+  public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
     RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
     RpcServer rpcServer = new TestRpcServer(scheduler, CONF);
     verify(scheduler).init((RpcScheduler.Context) anyObject());
-    AbstractRpcClient client = createRpcClient(CONF);
-    try {
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
       rpcServer.start();
       verify(scheduler).start();
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
       for (int i = 0; i < 10; i++) {
-        client.call(new PayloadCarryingRpcController(
-            CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
-            md.getOutputType().toProto(), User.getCurrent(), address,
-            new MetricsConnection.CallStats());
+        stub.echo(null, param);
       }
       verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
     } finally {
@@ -298,109 +202,194 @@ public abstract class AbstractTestIPC {
 
   /** Tests that the rpc scheduler is called when requests arrive. */
   @Test
-  public void testRpcMaxRequestSize() throws IOException, InterruptedException {
+  public void testRpcMaxRequestSize() throws IOException, ServiceException {
     Configuration conf = new Configuration(CONF);
     conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
     RpcServer rpcServer = new TestRpcServer(conf);
-    AbstractRpcClient client = createRpcClient(conf);
-    try {
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
       rpcServer.start();
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       StringBuilder message = new StringBuilder(1200);
       for (int i = 0; i < 200; i++) {
         message.append("hello.");
       }
-      // set total RPC size bigger than 1000 bytes
+      // set total RPC size bigger than 100 bytes
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      try {
-        client.call(new PayloadCarryingRpcController(
-          CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
-          md.getOutputType().toProto(), User.getCurrent(), address,
-          new MetricsConnection.CallStats());
-        fail("RPC should have failed because it exceeds max request size");
-      } catch(IOException e) {
-        LOG.info("Caught expected exception: " + e);
-        assertTrue(e.toString(),
-            StringUtils.stringifyException(e).contains("RequestTooBigException"));
-      }
+      stub.echo(
+        new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
+        param);
+      fail("RPC should have failed because it exceeds max request size");
+    } catch (ServiceException e) {
+      LOG.info("Caught expected exception: " + e);
+      assertTrue(e.toString(),
+        StringUtils.stringifyException(e).contains("RequestTooBigException"));
     } finally {
       rpcServer.stop();
     }
   }
 
   /**
-   * Instance of RpcServer that echoes client hostAddress back to client
+   * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
+   * remoteAddress set to its Call Object
+   * @throws ServiceException
    */
-  static class TestRpcServer1 extends RpcServer {
-
-    private static BlockingInterface SERVICE1 =
-        new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
-          @Override
-          public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request)
-              throws ServiceException {
-            return EmptyResponseProto.newBuilder().build();
-          }
-
-          @Override
-          public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
-              throws ServiceException {
-            final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
-            final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress();
-            return EchoResponseProto.newBuilder().setMessage(message).build();
-          }
-
-          @Override
-          public EmptyResponseProto error(RpcController unused, EmptyRequestProto request)
-              throws ServiceException {
-            throw new ServiceException("error", new IOException("error"));
-          }
-        };
-
-    TestRpcServer1() throws IOException {
-      this(new FifoRpcScheduler(CONF, 1));
+  @Test
+  public void testRpcServerForNotNullRemoteAddressInCallObject()
+      throws IOException, ServiceException {
+    TestRpcServer rpcServer = new TestRpcServer();
+    InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
+      rpcServer.start();
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+      assertEquals(localAddr.getAddress().getHostAddress(),
+        stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());
+    } finally {
+      rpcServer.stop();
     }
+  }
 
-    TestRpcServer1(RpcScheduler scheduler) throws IOException {
-      super(null, "testRemoteAddressInCallObject", Lists
-          .newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto
-              .newReflectiveBlockingService(SERVICE1), null)),
-          new InetSocketAddress("localhost", 0), CONF, scheduler);
+  @Test
+  public void testRemoteError() throws IOException, ServiceException {
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
+      rpcServer.start();
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+      stub.error(null, EmptyRequestProto.getDefaultInstance());
+    } catch (ServiceException e) {
+      LOG.info("Caught expected exception: " + e);
+      IOException ioe = ProtobufUtil.handleRemoteException(e);
+      assertTrue(ioe instanceof DoNotRetryIOException);
+      assertTrue(ioe.getMessage().contains("server error!"));
+    } finally {
+      rpcServer.stop();
     }
   }
 
-  /**
-   * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
-   * remoteAddress set to its Call Object
-   * @throws ServiceException
-   */
   @Test
-  public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException,
-      ServiceException {
-    final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
-    final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
-    final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
-    final AbstractRpcClient client =
-        new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null);
-    try {
+  public void testTimeout() throws IOException {
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
       rpcServer.start();
-      final InetSocketAddress isa = rpcServer.getListenerAddress();
-      if (isa == null) {
-        throw new IOException("Listener channel is closed");
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+      int ms = 1000;
+      int timeout = 100;
+      for (int i = 0; i < 10; i++) {
+        pcrc.reset();
+        pcrc.setCallTimeout(timeout);
+        long startTime = System.nanoTime();
+        try {
+          stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());
+        } catch (ServiceException e) {
+          long waitTime = (System.nanoTime() - startTime) / 1000000;
+          // expected
+          LOG.info("Caught expected exception: " + e);
+          IOException ioe = ProtobufUtil.handleRemoteException(e);
+          assertTrue(ioe.getCause() instanceof CallTimeoutException);
+          // confirm that we got exception before the actual pause.
+          assertTrue(waitTime < ms);
+        }
       }
-      final BlockingRpcChannel channel =
-          client.createBlockingRpcChannel(
-            ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()),
-            User.getCurrent(), 0);
-      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
-          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
-      final EchoRequestProto echoRequest =
-          EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
-      final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
-      Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage());
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  static class TestFailingRpcServer extends TestRpcServer {
+
+    TestFailingRpcServer() throws IOException {
+      this(new FifoRpcScheduler(CONF, 1), CONF);
+    }
+
+    TestFailingRpcServer(Configuration conf) throws IOException {
+      this(new FifoRpcScheduler(conf, 1), conf);
+    }
+
+    TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
+      super(scheduler, conf);
+    }
+
+    class FailingConnection extends Connection {
+      public FailingConnection(SocketChannel channel, long lastContact) {
+        super(channel, lastContact);
+      }
+
+      @Override
+      protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+        // this will throw exception after the connection header is read, and an RPC is sent
+        // from client
+        throw new DoNotRetryIOException("Failing for test");
+      }
+    }
+
+    @Override
+    protected Connection getConnection(SocketChannel channel, long time) {
+      return new FailingConnection(channel, time);
+    }
+  }
+
+  /** Tests that the connection closing is handled by the client with outstanding RPC calls */
+  @Test
+  public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
+    Configuration conf = new Configuration(CONF);
+    RpcServer rpcServer = new TestFailingRpcServer(conf);
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
+      rpcServer.start();
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+      stub.echo(null, param);
+      fail("RPC should have failed because connection closed");
+    } catch (ServiceException e) {
+      LOG.info("Caught expected exception: " + e.toString());
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testAsyncEcho() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
+      rpcServer.start();
+      Interface stub = newStub(client, rpcServer.getListenerAddress());
+      int num = 10;
+      List<HBaseRpcController> pcrcList = new ArrayList<>();
+      List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();
+      for (int i = 0; i < num; i++) {
+        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
+        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
+        pcrcList.add(pcrc);
+        callbackList.add(done);
+      }
+      for (int i = 0; i < num; i++) {
+        HBaseRpcController pcrc = pcrcList.get(i);
+        assertFalse(pcrc.failed());
+        assertNull(pcrc.cellScanner());
+        assertEquals("hello-" + i, callbackList.get(i).get().getMessage());
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testAsyncRemoteError() throws IOException {
+    AbstractRpcClient<?> client = createRpcClient(CONF);
+    TestRpcServer rpcServer = new TestRpcServer();
+    try {
+      rpcServer.start();
+      Interface stub = newStub(client, rpcServer.getListenerAddress());
+      BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
+      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+      stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);
+      assertNull(callback.get());
+      assertTrue(pcrc.failed());
+      LOG.info("Caught expected exception: " + pcrc.getFailed());
+      IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
+      assertTrue(ioe instanceof DoNotRetryIOException);
+      assertTrue(ioe.getMessage().contains("server error!"));
     } finally {
       client.close();
       rpcServer.stop();
@@ -408,17 +397,38 @@ public abstract class AbstractTestIPC {
   }
 
   @Test
-  public void testWrapException() throws Exception {
-    AbstractRpcClient client =
-        (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
-    final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
-    assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
-    assertTrue(client.wrapException(address,
-      new SocketTimeoutException()) instanceof SocketTimeoutException);
-    assertTrue(client.wrapException(address, new ConnectionClosingException(
-        "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
-    assertTrue(client
-        .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
-        .getCause() instanceof CallTimeoutException);
+  public void testAsyncTimeout() throws IOException {
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
+      rpcServer.start();
+      Interface stub = newStub(client, rpcServer.getListenerAddress());
+      List<HBaseRpcController> pcrcList = new ArrayList<>();
+      List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();
+      int ms = 1000;
+      int timeout = 100;
+      long startTime = System.nanoTime();
+      for (int i = 0; i < 10; i++) {
+        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+        pcrc.setCallTimeout(timeout);
+        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
+        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);
+        pcrcList.add(pcrc);
+        callbackList.add(callback);
+      }
+      for (BlockingRpcCallback<?> callback : callbackList) {
+        assertNull(callback.get());
+      }
+      long waitTime = (System.nanoTime() - startTime) / 1000000;
+      for (HBaseRpcController pcrc : pcrcList) {
+        assertTrue(pcrc.failed());
+        LOG.info("Caught expected exception: " + pcrc.getFailed());
+        IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
+        assertTrue(ioe.getCause() instanceof CallTimeoutException);
+      }
+      // confirm that we got exception before the actual pause.
+      assertTrue(waitTime < ms);
+    } finally {
+      rpcServer.stop();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
deleted file mode 100644
index d9b3e49..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.util.StringUtils;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-
-@RunWith(Parameterized.class)
-@Category({ SmallTests.class })
-public class TestAsyncIPC extends AbstractTestIPC {
-
-  private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
-
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  @Parameters
-  public static Collection<Object[]> parameters() {
-    List<Object[]> paramList = new ArrayList<Object[]>();
-    paramList.add(new Object[] { false, false });
-    paramList.add(new Object[] { false, true });
-    paramList.add(new Object[] { true, false });
-    paramList.add(new Object[] { true, true });
-    return paramList;
-  }
-
-  private final boolean useNativeTransport;
-
-  private final boolean useGlobalEventLoopGroup;
-
-  public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) {
-    this.useNativeTransport = useNativeTransport;
-    this.useGlobalEventLoopGroup = useGlobalEventLoopGroup;
-  }
-
-  private void setConf(Configuration conf) {
-    conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport);
-    conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, useGlobalEventLoopGroup);
-    if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) {
-      if (useNativeTransport
-          && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup)
-          || (!useNativeTransport
-          && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) {
-        AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
-        AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
-      }
-    }
-  }
-
-  @Override
-  protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
-    setConf(conf);
-    return new AsyncRpcClient(conf) {
-
-      @Override
-      Codec getCodec() {
-        return null;
-      }
-
-    };
-  }
-
-  @Override
-  protected AsyncRpcClient createRpcClient(Configuration conf) {
-    setConf(conf);
-    return new AsyncRpcClient(conf);
-  }
-
-  @Override
-  protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
-    setConf(conf);
-    return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
-          @Override
-          protected void initChannel(SocketChannel ch) throws Exception {
-            ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
-              @Override
-              public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
-                  throws Exception {
-                promise.setFailure(new RuntimeException("Injected fault"));
-              }
-            });
-          }
-        });
-  }
-
-  @Test
-  public void testAsyncConnectionSetup() throws Exception {
-    TestRpcServer rpcServer = new TestRpcServer();
-    AsyncRpcClient client = createRpcClient(CONF);
-    try {
-      rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
-      RpcChannel channel =
-          client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
-            System.currentTimeMillis()), User.getCurrent(), 0);
-
-      final AtomicBoolean done = new AtomicBoolean(false);
-
-      channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType()
-          .toProto(), new RpcCallback<Message>() {
-        @Override
-        public void run(Message parameter) {
-          done.set(true);
-        }
-      });
-
-      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return done.get();
-        }
-      });
-    } finally {
-      client.close();
-      rpcServer.stop();
-    }
-  }
-
-  @Test
-  public void testRTEDuringAsyncConnectionSetup() throws Exception {
-    TestRpcServer rpcServer = new TestRpcServer();
-    AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF);
-    try {
-      rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
-      RpcChannel channel =
-          client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
-            System.currentTimeMillis()), User.getCurrent(), 0);
-
-      final AtomicBoolean done = new AtomicBoolean(false);
-
-      PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
-      controller.notifyOnFail(new RpcCallback<IOException>() {
-        @Override
-        public void run(IOException e) {
-          done.set(true);
-          LOG.info("Caught expected exception: " + e.toString());
-          assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
-        }
-      });
-
-      channel.callMethod(md, controller, param, md.getOutputType().toProto(),
-        new RpcCallback<Message>() {
-          @Override
-          public void run(Message parameter) {
-            done.set(true);
-            fail("Expected an exception to have been thrown!");
-          }
-        });
-
-      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return done.get();
-        }
-      });
-    } finally {
-      client.close();
-      rpcServer.stop();
-    }
-  }
-
-  public static void main(String[] args) throws IOException, SecurityException,
-      NoSuchMethodException, InterruptedException {
-    if (args.length != 2) {
-      System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
-      return;
-    }
-    // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
-    // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
-    int cycles = Integer.parseInt(args[0]);
-    int cellcount = Integer.parseInt(args[1]);
-    Configuration conf = HBaseConfiguration.create();
-    TestRpcServer rpcServer = new TestRpcServer();
-    MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-    EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-    AsyncRpcClient client = new AsyncRpcClient(conf);
-    KeyValue kv = BIG_CELL;
-    Put p = new Put(CellUtil.cloneRow(kv));
-    for (int i = 0; i < cellcount; i++) {
-      p.add(kv);
-    }
-    RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
-    rm.add(p);
-    try {
-      rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      long startTime = System.currentTimeMillis();
-      User user = User.getCurrent();
-      for (int i = 0; i < cycles; i++) {
-        List<CellScannable> cells = new ArrayList<CellScannable>();
-        // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
-        ClientProtos.RegionAction.Builder builder =
-            RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
-              RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
-              MutationProto.newBuilder());
-        builder.setRegion(RegionSpecifier
-            .newBuilder()
-            .setType(RegionSpecifierType.REGION_NAME)
-            .setValue(
-              ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
-        if (i % 100000 == 0) {
-          LOG.info("" + i);
-          // Uncomment this for a thread dump every so often.
-          // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
-          // "Thread dump " + Thread.currentThread().getName());
-        }
-        PayloadCarryingRpcController pcrc =
-            new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
-        // Pair<Message, CellScanner> response =
-        client.call(pcrc, md, builder.build(), param, user, address,
-            new MetricsConnection.CallStats());
-        /*
-         * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
-         * count);
-         */
-      }
-      LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
-          + (System.currentTimeMillis() - startTime) + "ms");
-    } finally {
-      client.close();
-      rpcServer.stop();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
new file mode 100644
index 0000000..98efcfb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ RPCTests.class, SmallTests.class })
+public class TestBlockingIPC extends AbstractTestIPC {
+
+  @Override
+  protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) {
+    return new BlockingRpcClient(conf) {
+      @Override
+      Codec getCodec() {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  protected BlockingRpcClient createRpcClient(Configuration conf) {
+    return new BlockingRpcClient(conf);
+  }
+
+  @Override
+  protected BlockingRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
+      throws IOException {
+    return new BlockingRpcClient(conf) {
+
+      @Override
+      boolean isTcpNoDelay() {
+        throw new RuntimeException("Injected fault");
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
deleted file mode 100644
index e294830..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ SmallTests.class })
-public class TestGlobalEventLoopGroup {
-
-  @Test
-  public void test() {
-    Configuration conf = HBaseConfiguration.create();
-    conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true);
-    AsyncRpcClient client = new AsyncRpcClient(conf);
-    assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP);
-    AsyncRpcClient client1 = new AsyncRpcClient(conf);
-    assertSame(client.bootstrap.group(), client1.bootstrap.group());
-    client1.close();
-    assertFalse(client.bootstrap.group().isShuttingDown());
-
-    conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false);
-    AsyncRpcClient client2 = new AsyncRpcClient(conf);
-    assertNotSame(client.bootstrap.group(), client2.bootstrap.group());
-    client2.close();
-
-    client.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
deleted file mode 100644
index d3dbd33..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.net.NetUtils;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-
-@Category({ SmallTests.class })
-public class TestIPC extends AbstractTestIPC {
-
-  private static final Log LOG = LogFactory.getLog(TestIPC.class);
-
-  @Override
-  protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
-    return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
-      @Override
-      Codec getCodec() {
-        return null;
-      }
-    };
-  }
-
-  @Override
-  protected RpcClientImpl createRpcClient(Configuration conf) {
-    return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
-  }
-
-  @Override
-  protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf)
-      throws IOException {
-    SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
-    Mockito.doAnswer(new Answer<Socket>() {
-      @Override
-      public Socket answer(InvocationOnMock invocation) throws Throwable {
-        Socket s = spy((Socket) invocation.callRealMethod());
-        doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt());
-        return s;
-      }
-    }).when(spyFactory).createSocket();
-
-    return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
-  }
-
-  public static void main(String[] args) throws IOException, SecurityException,
-      NoSuchMethodException, InterruptedException {
-    if (args.length != 2) {
-      System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
-      return;
-    }
-    // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
-    // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
-    int cycles = Integer.parseInt(args[0]);
-    int cellcount = Integer.parseInt(args[1]);
-    Configuration conf = HBaseConfiguration.create();
-    TestRpcServer rpcServer = new TestRpcServer();
-    MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-    EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-    RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
-    KeyValue kv = BIG_CELL;
-    Put p = new Put(CellUtil.cloneRow(kv));
-    for (int i = 0; i < cellcount; i++) {
-      p.add(kv);
-    }
-    RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
-    rm.add(p);
-    try {
-      rpcServer.start();
-      long startTime = System.currentTimeMillis();
-      User user = User.getCurrent();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      for (int i = 0; i < cycles; i++) {
-        List<CellScannable> cells = new ArrayList<CellScannable>();
-        // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
-        ClientProtos.RegionAction.Builder builder =
-            RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
-              RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
-              MutationProto.newBuilder());
-        builder.setRegion(RegionSpecifier
-            .newBuilder()
-            .setType(RegionSpecifierType.REGION_NAME)
-            .setValue(
-              ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
-        if (i % 100000 == 0) {
-          LOG.info("" + i);
-          // Uncomment this for a thread dump every so often.
-          // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
-          // "Thread dump " + Thread.currentThread().getName());
-        }
-        PayloadCarryingRpcController pcrc =
-            new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
-        // Pair<Message, CellScanner> response =
-        client.call(pcrc, md, builder.build(), param, user, address,
-            new MetricsConnection.CallStats());
-        /*
-         * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
-         * count);
-         */
-      }
-      LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
-          + (System.currentTimeMillis() - startTime) + "ms");
-    } finally {
-      client.close();
-      rpcServer.stop();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
new file mode 100644
index 0000000..3b32383
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.JVM;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ RPCTests.class, SmallTests.class })
+public class TestNettyIPC extends AbstractTestIPC {
+
+  @Parameters(name = "{index}: EventLoop={0}")
+  public static Collection<Object[]> parameters() {
+    List<Object[]> params = new ArrayList<>();
+    params.add(new Object[] { "nio" });
+    params.add(new Object[] { "perClientNio" });
+    if (JVM.isLinux() && JVM.isAmd64()) {
+      params.add(new Object[] { "epoll" });
+    }
+    return params;
+  }
+
+  @Parameter
+  public String eventLoopType;
+
+  private static NioEventLoopGroup NIO;
+
+  private static EpollEventLoopGroup EPOLL;
+
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    NIO = new NioEventLoopGroup();
+    if (JVM.isLinux() && JVM.isAmd64()) {
+      EPOLL = new EpollEventLoopGroup();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    if (NIO != null) {
+      NIO.shutdownGracefully();
+    }
+    if (EPOLL != null) {
+      EPOLL.shutdownGracefully();
+    }
+  }
+
+  private void setConf(Configuration conf) {
+    switch (eventLoopType) {
+      case "nio":
+        NettyRpcClientConfigHelper.setEventLoopConfig(conf, NIO, NioSocketChannel.class);
+        break;
+      case "epoll":
+        NettyRpcClientConfigHelper.setEventLoopConfig(conf, EPOLL, EpollSocketChannel.class);
+        break;
+      case "perClientNio":
+        NettyRpcClientConfigHelper.createEventLoopPerClient(conf);
+        break;
+      default:
+        break;
+    }
+  }
+
+  @Override
+  protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
+    setConf(conf);
+    return new NettyRpcClient(conf) {
+
+      @Override
+      Codec getCodec() {
+        return null;
+      }
+
+    };
+  }
+
+  @Override
+  protected NettyRpcClient createRpcClient(Configuration conf) {
+    setConf(conf);
+    return new NettyRpcClient(conf);
+  }
+
+  @Override
+  protected NettyRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
+    setConf(conf);
+    return new NettyRpcClient(conf) {
+
+      @Override
+      boolean isTcpNoDelay() {
+        throw new RuntimeException("Injected fault");
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index ffb3927..3df4cdc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -17,41 +17,39 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.Before;
 import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 /**
- * Test for testing protocol buffer based RPC mechanism.
- * This test depends on test.proto definition of types in <code>src/test/protobuf/test.proto</code>
- * and protobuf service definition from <code>src/test/protobuf/test_rpc_service.proto</code>
+ * Test for testing protocol buffer based RPC mechanism. This test depends on test.proto definition
+ * of types in <code>src/test/protobuf/test.proto</code> and protobuf service definition from
+ * <code>src/test/protobuf/test_rpc_service.proto</code>
  */
-@Category(MediumTests.class)
+@Category({ RPCTests.class, MediumTests.class })
 public class TestProtoBufRpc {
   public final static String ADDRESS = "localhost";
   public static int PORT = 0;
@@ -59,47 +57,18 @@ public class TestProtoBufRpc {
   private Configuration conf;
   private RpcServerInterface server;
 
-  /**
-   * Implementation of the test service defined out in TestRpcServiceProtos
-   */
-  static class PBServerImpl
-  implements TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface {
-    @Override
-    public EmptyResponseProto ping(RpcController unused,
-        EmptyRequestProto request) throws ServiceException {
-      return EmptyResponseProto.newBuilder().build();
-    }
-
-    @Override
-    public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
-        throws ServiceException {
-      return EchoResponseProto.newBuilder().setMessage(request.getMessage())
-          .build();
-    }
-
-    @Override
-    public EmptyResponseProto error(RpcController unused,
-        EmptyRequestProto request) throws ServiceException {
-      throw new ServiceException("error", new IOException("error"));
-    }
-  }
-
   @Before
-  public  void setUp() throws IOException { // Setup server for both protocols
+  public void setUp() throws IOException { // Setup server for both protocols
     this.conf = HBaseConfiguration.create();
     Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer");
     log.setLevel(Level.DEBUG);
     log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace");
     log.setLevel(Level.TRACE);
     // Create server side implementation
-    PBServerImpl serverImpl = new PBServerImpl();
-    BlockingService service =
-      TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl);
     // Get RPC server for server side implementation
     this.server = new RpcServer(null, "testrpc",
-        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
-        new InetSocketAddress(ADDRESS, PORT), conf,
-        new FifoRpcScheduler(conf, 10));
+        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
+        new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10));
     InetSocketAddress address = server.getListenerAddress();
     if (address == null) {
       throw new IOException("Listener channel is closed");
@@ -113,31 +82,23 @@ public class TestProtoBufRpc {
     server.stop();
   }
 
-  @Test
+  @Test(expected = ServiceException.class
+  /* Thrown when we call stub.error */)
   public void testProtoBufRpc() throws Exception {
     RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
     try {
-      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-          ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()),
-        User.getCurrent(), 0);
-      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
-        TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
+      BlockingInterface stub = newBlockingStub(rpcClient, this.isa);
       // Test ping method
-      TestProtos.EmptyRequestProto emptyRequest =
-        TestProtos.EmptyRequestProto.newBuilder().build();
+      TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.newBuilder().build();
       stub.ping(null, emptyRequest);
 
       // Test echo method
       EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build();
       EchoResponseProto echoResponse = stub.echo(null, echoRequest);
-      Assert.assertEquals(echoResponse.getMessage(), "hello");
+      assertEquals(echoResponse.getMessage(), "hello");
 
-      // Test error method - error should be thrown as RemoteException
-      try {
-        stub.error(null, emptyRequest);
-        Assert.fail("Expected exception is not thrown");
-      } catch (ServiceException e) {
-      }
+      stub.error(null, emptyRequest);
+      fail("Expected exception is not thrown");
     } finally {
       rpcClient.close();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
new file mode 100644
index 0000000..8f947b1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Threads;
+
+@InterfaceAudience.Private
+public class TestProtobufRpcServiceImpl implements BlockingInterface {
+
+  public static final BlockingService SERVICE = TestProtobufRpcProto
+      .newReflectiveBlockingService(new TestProtobufRpcServiceImpl());
+
+  public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr)
+      throws IOException {
+    return newBlockingStub(client, addr, User.getCurrent());
+  }
+
+  public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr,
+      User user) throws IOException {
+    return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel(
+      ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), user, 0));
+  }
+
+  public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException {
+    return TestProtobufRpcProto.newStub(client.createRpcChannel(
+      ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()),
+      User.getCurrent(), 0));
+  }
+
+  @Override
+  public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
+      throws ServiceException {
+    return EmptyResponseProto.getDefaultInstance();
+  }
+
+  @Override
+  public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
+      throws ServiceException {
+    if (controller instanceof HBaseRpcController) {
+      HBaseRpcController pcrc = (HBaseRpcController) controller;
+      // If cells, scan them to check we are able to iterate what we were given and since this is an
+      // echo, just put them back on the controller creating a new block. Tests our block building.
+      CellScanner cellScanner = pcrc.cellScanner();
+      List<Cell> list = null;
+      if (cellScanner != null) {
+        list = new ArrayList<>();
+        try {
+          while (cellScanner.advance()) {
+            list.add(cellScanner.current());
+          }
+        } catch (IOException e) {
+          throw new ServiceException(e);
+        }
+      }
+      cellScanner = CellUtil.createCellScanner(list);
+      pcrc.setCellScanner(cellScanner);
+    }
+    return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
+  }
+
+  @Override
+  public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
+      throws ServiceException {
+    throw new ServiceException(new DoNotRetryIOException("server error!"));
+  }
+
+  @Override
+  public EmptyResponseProto pause(RpcController controller, PauseRequestProto request)
+      throws ServiceException {
+    Threads.sleepWithoutInterrupt(request.getMs());
+    return EmptyResponseProto.getDefaultInstance();
+  }
+
+  @Override
+  public AddrResponseProto addr(RpcController controller, EmptyRequestProto request)
+      throws ServiceException {
+    return AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress())
+        .build();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
index 596b8ab..e4ecd10 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
@@ -17,15 +17,20 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -35,28 +40,26 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.TestRule;
 
-import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
-import static org.junit.Assert.*;
-
-@Category(SmallTests.class)
+@Category(MediumTests.class)
 public class TestRpcClientLeaks {
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
+      withLookingForStuckThread(true).build();
 
-  public static class MyRpcClientImpl extends RpcClientImpl {
+  public static class MyRpcClientImpl extends BlockingRpcClient {
     public static List<Socket> savedSockets = Lists.newArrayList();
     @Rule public ExpectedException thrown = ExpectedException.none();
 
-    public MyRpcClientImpl(Configuration conf, String clusterId) {
-      super(conf, clusterId);
+    public MyRpcClientImpl(Configuration conf) {
+      super(conf);
     }
 
     public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address,
@@ -65,9 +68,8 @@ public class TestRpcClientLeaks {
     }
 
     @Override
-    protected Connection createConnection(ConnectionId remoteId, Codec codec,
-        CompressionCodec compressor) throws IOException {
-      return new Connection(remoteId, codec, compressor) {
+    protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws IOException {
+      return new BlockingRpcConnection(this, remoteId) {
         @Override
         protected synchronized void setupConnection() throws IOException {
           super.setupConnection();
@@ -113,5 +115,4 @@ public class TestRpcClientLeaks {
       assertTrue("Socket + " +  socket + " is not closed", socket.isClosed());
     }
   }
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index 03e9e4e..d197bf2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -17,107 +17,31 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.common.collect.ImmutableList;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.mockito.Mockito.mock;
+
 import com.google.common.collect.Lists;
 import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Pair;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
 
-@Category({SmallTests.class})
+@Category({ RPCTests.class, SmallTests.class })
 public class TestRpcHandlerException {
-  private static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
-  static String example = "xyz";
-  static byte[] CELL_BYTES = example.getBytes();
-  static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
 
   private final static Configuration CONF = HBaseConfiguration.create();
-  RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
-
-  // We are using the test TestRpcServiceProtos generated classes and Service because they are
-  // available and basic with methods like 'echo', and ping. Below we make a blocking service
-  // by passing in implementation of blocking interface. We use this service in all tests that
-  // follow.
-  private static final BlockingService SERVICE =
-      TestRpcServiceProtos.TestProtobufRpcProto
-      .newReflectiveBlockingService(new TestRpcServiceProtos
-		  .TestProtobufRpcProto.BlockingInterface() {
-
-        @Override
-        public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
-            throws ServiceException {
-          return null;
-        }
-
-        @Override
-        public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
-            throws ServiceException {
-          return null;
-        }
-
-        @Override
-        public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
-            throws Error, RuntimeException {
-          if (controller instanceof PayloadCarryingRpcController) {
-            PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
-            // If cells, scan them to check we are able to iterate what we were given and since
-            // this is
-            // an echo, just put them back on the controller creating a new block. Tests our
-            // block
-            // building.
-            CellScanner cellScanner = pcrc.cellScanner();
-            List<Cell> list = null;
-            if (cellScanner != null) {
-		list = new ArrayList<Cell>();
-		try {
-			while (cellScanner.advance()) {
-				list.add(cellScanner.current());
-				throw new StackOverflowError();
-			}
-		} catch (StackOverflowError e) {
-			throw e;
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-            }
-            cellScanner = CellUtil.createCellScanner(list);
-            ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
-          }
-          return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
-        }
-      });
 
   /**
    * Instance of server. We actually don't do anything speical in here so could just use
@@ -125,29 +49,18 @@ public class TestRpcHandlerException {
    */
   private static class TestRpcServer extends RpcServer {
 
-    TestRpcServer() throws IOException {
-      this(new FifoRpcScheduler(CONF, 1));
-    }
-
     TestRpcServer(RpcScheduler scheduler) throws IOException {
       super(null, "testRpcServer",
-		  Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
-		  new InetSocketAddress("localhost", 0), CONF, scheduler);
-    }
-
-    @Override
-    public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
-      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
-          throws IOException {
-      return super.call(service, md, param, cellScanner, receiveTime, status);
+          Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
+          new InetSocketAddress("localhost", 0), CONF, scheduler);
     }
   }
 
-  /** Tests that the rpc scheduler is called when requests arrive.
-   *  When Rpc handler thread dies, the client will hang and the test will fail.
-   *  The test is meant to be a unit test to test the behavior.
-   *
-   * */
+  /**
+   * Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the
+   * client will hang and the test will fail. The test is meant to be a unit test to test the
+   * behavior.
+   */
   private class AbortServer implements Abortable {
     private boolean aborted = false;
 
@@ -162,7 +75,8 @@ public class TestRpcHandlerException {
     }
   }
 
-  /* This is a unit test to make sure to abort region server when the number of Rpc handler thread
+  /*
+   * This is a unit test to make sure to abort region server when the number of Rpc handler thread
    * caught errors exceeds the threshold. Client will hang when RS aborts.
    */
   @Ignore
@@ -172,21 +86,12 @@ public class TestRpcHandlerException {
     Abortable abortable = new AbortServer();
     RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
     RpcServer rpcServer = new TestRpcServer(scheduler);
-    RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
-    try {
+    try (BlockingRpcClient client = new BlockingRpcClient(CONF)) {
       rpcServer.start();
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      PayloadCarryingRpcController controller =
-          new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
-          address, new MetricsConnection.CallStats());
+      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+      stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
     } catch (Throwable e) {
-      assert(abortable.isAborted() == true);
+      assert (abortable.isAborted() == true);
     } finally {
       rpcServer.stop();
     }


Mime
View raw message