hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [04/10] hbase git commit: HBASE-16584 Backport the new ipc implementation in HBASE-16432 to branch-1
Date Thu, 16 Mar 2017 15:00:57 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
deleted file mode 100644
index b506b88..0000000
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-
-@Category(SmallTests.class)
-public class TestPayloadCarryingRpcController {
-  @Test
-  public void testListOfCellScannerables() throws IOException {
-    List<CellScannable> cells = new ArrayList<CellScannable>();
-    final int count = 10;
-    for (int i = 0; i < count; i++) {
-      cells.add(createCell(i));
-    }
-    PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
-    CellScanner cellScanner = controller.cellScanner();
-    int index = 0;
-    for (; cellScanner.advance(); index++) {
-      Cell cell = cellScanner.current();
-      byte [] indexBytes = Bytes.toBytes(index);
-      assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(),
-        cell.getValueOffset(), cell.getValueLength()));
-    }
-    assertEquals(count, index);
-  }
-
-  /**
-   * @param index
-   * @return A faked out 'Cell' that does nothing but return index as its value
-   */
-  static CellScannable createCell(final int index) {
-    return new CellScannable() {
-      @Override
-      public CellScanner cellScanner() {
-        return new CellScanner() {
-          @Override
-          public Cell current() {
-            // Fake out a Cell.  All this Cell has is a value that is an int in size and equal
-            // to the above 'index' param serialized as an int.
-            return new Cell() {
-              private final int i = index;
-
-              @Override
-              public byte[] getRowArray() {
-                // unused
-                return null;
-              }
-
-              @Override
-              public int getRowOffset() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public short getRowLength() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public byte[] getFamilyArray() {
-                // unused
-                return null;
-              }
-
-              @Override
-              public int getFamilyOffset() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public byte getFamilyLength() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public byte[] getQualifierArray() {
-                // unused
-                return null;
-              }
-
-              @Override
-              public int getQualifierOffset() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public int getQualifierLength() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public long getTimestamp() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public byte getTypeByte() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public long getMvccVersion() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public long getSequenceId() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public byte[] getValueArray() {
-                return Bytes.toBytes(this.i);
-              }
-
-              @Override
-              public int getValueOffset() {
-                return 0;
-              }
-
-              @Override
-              public int getValueLength() {
-                return Bytes.SIZEOF_INT;
-              }
-
-              @Override
-              public int getTagsOffset() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public int getTagsLength() {
-                // unused
-                return 0;
-              }
-
-              @Override
-              public byte[] getTagsArray() {
-                // unused
-                return null;
-              }
-
-              @Override
-              public byte[] getValue() {
-                // unused
-                return null;
-              }
-
-              @Override
-              public byte[] getFamily() {
-                // unused
-                return null;
-              }
-
-              @Override
-              public byte[] getQualifier() {
-                // unused
-                return null;
-              }
-
-              @Override
-              public byte[] getRow() {
-                // unused
-                return null;
-              }
-            };
-          }
-
-          private boolean hasCell = true;
-          @Override
-          public boolean advance() {
-            // We have one Cell only so return true first time then false ever after.
-            if (!hasCell) return hasCell;
-            hasCell = false;
-            return true;
-          }
-        };
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
new file mode 100644
index 0000000..12b3661
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Strings;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.TextOutputCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.hbase.security.AbstractHBaseSaslRpcClient.SaslClientCallbackHandler;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+@Category({SecurityTests.class, SmallTests.class})
+public class TestHBaseSaslRpcClient {
+
+  static {
+    System.setProperty("java.security.krb5.realm", "DOMAIN.COM");
+    System.setProperty("java.security.krb5.kdc", "DOMAIN.COM");
+  }
+
+  static final String DEFAULT_USER_NAME = "principal";
+  static final String DEFAULT_USER_PASSWORD = "password";
+
+  private static final Logger LOG = Logger.getLogger(TestHBaseSaslRpcClient.class);
+
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void before() {
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+  }
+
+  @Test
+  public void testSaslClientUsesGivenRpcProtection() throws Exception {
+    Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME,
+        DEFAULT_USER_PASSWORD);
+    for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) {
+      String negotiatedQop = new HBaseSaslRpcClient(AuthMethod.DIGEST, token,
+          "principal/host@DOMAIN.COM", false, qop.name()) {
+        public String getQop() {
+          return saslProps.get(Sasl.QOP);
+        }
+      }.getQop();
+      assertEquals(negotiatedQop, qop.getSaslQop());
+    }
+  }
+
+  @Test
+  public void testSaslClientCallbackHandler() throws UnsupportedCallbackException {
+    final Token<? extends TokenIdentifier> token = createTokenMock();
+    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+
+    final NameCallback nameCallback = mock(NameCallback.class);
+    final PasswordCallback passwordCallback = mock(PasswordCallback.class);
+    final RealmCallback realmCallback = mock(RealmCallback.class);
+    final RealmChoiceCallback realmChoiceCallback = mock(RealmChoiceCallback.class);
+
+    Callback[] callbackArray = {nameCallback, passwordCallback,
+        realmCallback, realmChoiceCallback};
+    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
+    saslClCallbackHandler.handle(callbackArray);
+    verify(nameCallback).setName(anyString());
+    verify(realmCallback).setText(anyString());
+    verify(passwordCallback).setPassword(any(char[].class));
+  }
+
+  @Test
+  public void testSaslClientCallbackHandlerWithException() {
+    final Token<? extends TokenIdentifier> token = createTokenMock();
+    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
+    try {
+      saslClCallbackHandler.handle(new Callback[] { mock(TextOutputCallback.class) });
+    } catch (UnsupportedCallbackException expEx) {
+      //expected
+    } catch (Exception ex) {
+      fail("testSaslClientCallbackHandlerWithException error : " + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testHBaseSaslRpcClientCreation() throws Exception {
+    //creation kerberos principal check section
+    assertFalse(assertSuccessCreationKerberosPrincipal(null));
+    assertFalse(assertSuccessCreationKerberosPrincipal("DOMAIN.COM"));
+    assertFalse(assertSuccessCreationKerberosPrincipal("principal/DOMAIN.COM"));
+    if (!assertSuccessCreationKerberosPrincipal("principal/localhost@DOMAIN.COM")) {
+      // XXX: This can fail if kerberos support in the OS is not sane, see HBASE-10107.
+      // For now, don't assert, just warn
+      LOG.warn("Could not create a SASL client with valid Kerberos credential");
+    }
+
+    //creation digest principal check section
+    assertFalse(assertSuccessCreationDigestPrincipal(null, null));
+    assertFalse(assertSuccessCreationDigestPrincipal("", ""));
+    assertFalse(assertSuccessCreationDigestPrincipal("", null));
+    assertFalse(assertSuccessCreationDigestPrincipal(null, ""));
+    assertTrue(assertSuccessCreationDigestPrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+
+    //creation simple principal check section
+    assertFalse(assertSuccessCreationSimplePrincipal("", ""));
+    assertFalse(assertSuccessCreationSimplePrincipal(null, null));
+    assertFalse(assertSuccessCreationSimplePrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+
+    //exceptions check section
+    assertTrue(assertIOExceptionThenSaslClientIsNull(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+    assertTrue(assertIOExceptionWhenGetStreamsBeforeConnectCall(
+        DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+  }
+
+  @Test
+  public void testAuthMethodReadWrite() throws IOException {
+    DataInputBuffer in = new DataInputBuffer();
+    DataOutputBuffer out = new DataOutputBuffer();
+
+    assertAuthMethodRead(in, AuthMethod.SIMPLE);
+    assertAuthMethodRead(in, AuthMethod.KERBEROS);
+    assertAuthMethodRead(in, AuthMethod.DIGEST);
+
+    assertAuthMethodWrite(out, AuthMethod.SIMPLE);
+    assertAuthMethodWrite(out, AuthMethod.KERBEROS);
+    assertAuthMethodWrite(out, AuthMethod.DIGEST);
+  }
+
+  private void assertAuthMethodRead(DataInputBuffer in, AuthMethod authMethod)
+      throws IOException {
+    in.reset(new byte[] {authMethod.code}, 1);
+    assertEquals(authMethod, AuthMethod.read(in));
+  }
+
+  private void assertAuthMethodWrite(DataOutputBuffer out, AuthMethod authMethod)
+      throws IOException {
+    authMethod.write(out);
+    assertEquals(authMethod.code, out.getData()[0]);
+    out.reset();
+  }
+
+  private boolean assertIOExceptionWhenGetStreamsBeforeConnectCall(String principal,
+      String password) throws IOException {
+    boolean inState = false;
+    boolean outState = false;
+
+    HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
+        createTokenMockWithCredentials(principal, password), principal, false) {
+      @Override
+      public SaslClient createDigestSaslClient(String[] mechanismNames,
+          String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
+              throws IOException {
+        return Mockito.mock(SaslClient.class);
+      }
+
+      @Override
+      public SaslClient createKerberosSaslClient(String[] mechanismNames,
+          String userFirstPart, String userSecondPart) throws IOException {
+        return Mockito.mock(SaslClient.class);
+      }
+    };
+
+    try {
+      rpcClient.getInputStream(Mockito.mock(InputStream.class));
+    } catch(IOException ex) {
+      //Sasl authentication exchange hasn't completed yet
+      inState = true;
+    }
+
+    try {
+      rpcClient.getOutputStream(Mockito.mock(OutputStream.class));
+    } catch(IOException ex) {
+      //Sasl authentication exchange hasn't completed yet
+      outState = true;
+    }
+
+    return inState && outState;
+  }
+
+  private boolean assertIOExceptionThenSaslClientIsNull(String principal, String password) {
+    try {
+      new HBaseSaslRpcClient(AuthMethod.DIGEST,
+          createTokenMockWithCredentials(principal, password), principal, false) {
+        @Override
+        public SaslClient createDigestSaslClient(String[] mechanismNames,
+            String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
+                throws IOException {
+          return null;
+        }
+
+        @Override
+        public SaslClient createKerberosSaslClient(String[] mechanismNames,
+            String userFirstPart, String userSecondPart) throws IOException {
+          return null;
+        }
+      };
+      return false;
+    } catch (IOException ex) {
+      return true;
+    }
+  }
+
+  private boolean assertSuccessCreationKerberosPrincipal(String principal) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = createSaslRpcClientForKerberos(principal);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private boolean assertSuccessCreationDigestPrincipal(String principal, String password) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
+          createTokenMockWithCredentials(principal, password), principal, false);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private boolean assertSuccessCreationSimplePrincipal(String principal, String password) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = createSaslRpcClientSimple(principal, password);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private HBaseSaslRpcClient createSaslRpcClientForKerberos(String principal)
+      throws IOException {
+    return new HBaseSaslRpcClient(AuthMethod.KERBEROS, createTokenMock(), principal, false);
+  }
+
+  private Token<? extends TokenIdentifier> createTokenMockWithCredentials(
+      String principal, String password)
+      throws IOException {
+    Token<? extends TokenIdentifier> token = createTokenMock();
+    if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(password)) {
+      when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+      when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+    }
+    return token;
+  }
+
+  private HBaseSaslRpcClient createSaslRpcClientSimple(String principal, String password)
+      throws IOException {
+    return new HBaseSaslRpcClient(AuthMethod.SIMPLE, createTokenMock(), principal, false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Token<? extends TokenIdentifier> createTokenMock() {
+    return mock(Token.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 0a7ac8a..f41efc7 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -19,10 +19,17 @@
 package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -40,27 +47,17 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.Descriptors.MethodDescriptor;
 
 @Category(IntegrationTests.class)
 public class IntegrationTestRpcClient {
@@ -95,38 +92,13 @@ public class IntegrationTestRpcClient {
     }
   }
 
-  static final BlockingService SERVICE =
-      TestRpcServiceProtos.TestProtobufRpcProto
-      .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
-
-        @Override
-        public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
-            throws ServiceException {
-          return null;
-        }
-
-        @Override
-        public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
-            throws ServiceException {
-          return null;
-        }
-
-        @Override
-        public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
-            throws ServiceException {
-          return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
-        }
-      });
-
-  protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
-    return isSyncClient ?
-        new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
-          new AsyncRpcClient(conf) {
-          @Override
-          Codec getCodec() {
-            return null;
-          }
-        };
+  protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
+    return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
+      @Override
+      Codec getCodec() {
+        return null;
+      }
+    };
   }
 
   static String BIG_PAYLOAD;
@@ -283,7 +255,7 @@ public class IntegrationTestRpcClient {
   }
 
   static class SimpleClient extends Thread {
-    AbstractRpcClient rpcClient;
+    AbstractRpcClient<?> rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
     AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
@@ -292,7 +264,7 @@ public class IntegrationTestRpcClient {
     long numCalls = 0;
     Random random = new Random();
 
-    public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
+    public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) {
       this.cluster = cluster;
       this.rpcClient = rpcClient;
       this.id = id;
@@ -301,24 +273,16 @@ public class IntegrationTestRpcClient {
 
     @Override
     public void run() {
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-
       while (running.get()) {
         boolean isBigPayload = random.nextBoolean();
         String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
         EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
-        EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
-
+        EchoResponseProto ret;
         TestRpcServer server = cluster.getRandomServer();
         try {
-          User user = User.getCurrent();
-          InetSocketAddress address = server.getListenerAddress();
-          if (address == null) {
-            throw new IOException("Listener channel is closed");
-          }
           sending.set(true);
-          ret = (EchoResponseProto)
-              rpcClient.callBlockingMethod(md, null, param, ret, user, address);
+          BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress());
+          ret = stub.echo(null, param);
         } catch (Exception e) {
           LOG.warn(e);
           continue; // expected in case connection is closing or closed
@@ -360,7 +324,7 @@ public class IntegrationTestRpcClient {
     cluster.startServer();
     conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
     for(int i = 0; i <1000; i++) {
-      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      AbstractRpcClient<?> rpcClient = createRpcClient(conf, true);
       SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
       client.start();
       while(!client.isSending()) {
@@ -452,7 +416,7 @@ public class IntegrationTestRpcClient {
     ArrayList<SimpleClient> clients = new ArrayList<>();
 
     // all threads should share the same rpc client
-    AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
+    AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient);
 
     for (int i = 0; i < 30; i++) {
       String clientId = "client_" + i + "_";

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index db73dfe..871ea65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -20,6 +20,15 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -58,7 +67,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -68,20 +76,17 @@ import javax.security.sasl.SaslServer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CallQueueTooBigException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
-import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
@@ -90,7 +95,6 @@ import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -102,17 +106,16 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
 import org.apache.hadoop.hbase.security.SaslStatus;
 import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -134,17 +137,8 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
-import org.codehaus.jackson.map.ObjectMapper;
 import org.apache.htrace.TraceInfo;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * An RPC server that hosts protobuf described Services.
@@ -163,8 +157,6 @@ import com.google.protobuf.TextFormat;
  *
  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
  * queue for Responder to pull from and return result to client.
- *
- * @see RpcClientImpl
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
 @InterfaceStability.Evolving
@@ -195,7 +187,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
    */
   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
 
-  private final IPCUtil ipcUtil;
+  private final CellBlockBuilder cellBlockBuilder;
 
   private static final String AUTH_FAILED_FOR = "Auth failed for ";
   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
@@ -468,7 +460,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
         // reservoir when finished. This is hacky and the hack is not contained but benefits are
         // high when we can avoid a big buffer allocation on each rpc.
-        this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
+        this.cellBlock = cellBlockBuilder.buildCellBlock(this.connection.codec,
           this.connection.compressionCodec, cells, reservoir);
         if (this.cellBlock != null) {
           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
@@ -1175,6 +1167,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
      * @throws IOException
      */
     private boolean processResponse(final Call call) throws IOException {
+      LOG.info("processing " + call);
       boolean error = true;
       try {
         // Send as much data as we can in the non-blocking fashion
@@ -1442,7 +1435,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
               }
               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
-                  SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
+                  HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
                       secretManager, this));
               break;
             default:
@@ -1462,7 +1455,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
                 public Object run() throws SaslException {
                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
                       .getMechanismName(), names[0], names[1],
-                      SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
+                      HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
                   return null;
                 }
               });
@@ -1988,7 +1981,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
         if (header.hasCellBlockMeta()) {
           buf.position(offset);
-          cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf);
+          cellScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressionCodec, buf);
         }
       } catch (Throwable t) {
         InetSocketAddress address = getListenerAddress();
@@ -2194,7 +2187,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
 
-    this.ipcUtil = new IPCUtil(conf);
+    this.cellBlockBuilder = new CellBlockBuilder(conf);
 
 
     // Create the responder here
@@ -2346,7 +2339,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       status.setRPCPacket(param);
       status.resume("Servicing call");
       //get an instance of the method arg type
-      PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
+      HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
       controller.setCallTimeout(timeout);
       Message result = service.callBlockingMethod(md, controller, param);
       long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 61790d0..71d03ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetAddress;
@@ -52,7 +56,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.ipc.FailedServerException;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -66,25 +70,21 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Triple;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.hbase.util.Triple;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
 /**
  * The ServerManager class manages info about region servers.
  * <p>
@@ -871,7 +871,7 @@ public class ServerManager {
     }
   }
 
-  private PayloadCarryingRpcController newRpcController() {
+  private HBaseRpcController newRpcController() {
     return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
   }
 
@@ -899,7 +899,7 @@ public class ServerManager {
         region.getRegionNameAsString() +
         " failed because no RPC connection found to this server");
     }
-    PayloadCarryingRpcController controller = newRpcController();
+    HBaseRpcController controller = newRpcController();
     return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(),
       versionOfClosingNode, dest, transitionInZK);
   }
@@ -922,7 +922,7 @@ public class ServerManager {
     if (server == null) return;
     try {
       AdminService.BlockingInterface admin = getRsAdmin(server);
-      PayloadCarryingRpcController controller = newRpcController();
+      HBaseRpcController controller = newRpcController();
       ProtobufUtil.warmupRegion(controller, admin, region);
     } catch (IOException e) {
       LOG.error("Received exception in RPC for warmup server:" +
@@ -938,7 +938,7 @@ public class ServerManager {
   public static void closeRegionSilentlyAndWait(ClusterConnection connection,
     ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
     AdminService.BlockingInterface rs = connection.getAdmin(server);
-    PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController();
+    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
     try {
       ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName(), false);
     } catch (IOException e) {
@@ -946,6 +946,7 @@ public class ServerManager {
     }
     long expiration = timeout + System.currentTimeMillis();
     while (System.currentTimeMillis() < expiration) {
+      controller.reset();
       try {
         HRegionInfo rsRegion =
           ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
@@ -989,7 +990,7 @@ public class ServerManager {
           + region_b.getRegionNameAsString()
           + " failed because no RPC connection found to this server");
     }
-    PayloadCarryingRpcController controller = newRpcController();
+    HBaseRpcController controller = newRpcController();
     ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user);
   }
 
@@ -1008,7 +1009,7 @@ public class ServerManager {
         }
       }
       try {
-        PayloadCarryingRpcController controller = newRpcController();
+        HBaseRpcController controller = newRpcController();
         AdminService.BlockingInterface admin = getRsAdmin(server);
         if (admin != null) {
           ServerInfo info = ProtobufUtil.getServerInfo(controller, admin);

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 7a1031c..d13a79c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -20,6 +20,8 @@
 package org.apache.hadoop.hbase.protobuf;
 
 
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -35,7 +37,8 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -46,8 +49,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALKey;
 
-import com.google.protobuf.ServiceException;
-
 @InterfaceAudience.Private
 public class ReplicationProtbufUtil {
   /**
@@ -66,7 +67,7 @@ public class ReplicationProtbufUtil {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
         buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
           sourceHFileArchiveDir);
-    PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
+    HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
     try {
       admin.replicateWALEntry(controller, p.getFirst());
     } catch (ServiceException se) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9c89260..49ce348 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -90,7 +90,7 @@ import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.QosPriority;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
@@ -392,7 +392,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   private void addResult(final MutateResponse.Builder builder,
-      final Result result, final PayloadCarryingRpcController rpcc) {
+      final Result result, final HBaseRpcController rpcc) {
     if (result == null) return;
     if (isClientCellBlockSupport()) {
       builder.setResult(ProtobufUtil.toResultNoData(result));
@@ -404,7 +404,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
-      final PayloadCarryingRpcController controller, boolean isDefaultRegion) {
+      final HBaseRpcController controller, boolean isDefaultRegion) {
     builder.setStale(!isDefaultRegion);
     if (results.isEmpty()) {
       return;
@@ -1795,7 +1795,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   public ReplicateWALEntryResponse replay(final RpcController controller,
       final ReplicateWALEntryRequest request) throws ServiceException {
     long before = EnvironmentEdgeManager.currentTime();
-    CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
+    CellScanner cells = ((HBaseRpcController) controller).cellScanner();
     try {
       checkOpen();
       List<WALEntry> entries = request.getEntryList();
@@ -1900,7 +1900,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       if (regionServer.replicationSinkHandler != null) {
         requestCount.increment();
         List<WALEntry> entries = request.getEntryList();
-        CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
+        CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner();
         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
           request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
@@ -2129,10 +2129,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       } else  if (r != null) {
         ClientProtos.Result pbr;
         RpcCallContext call = RpcServer.getCurrentCall();
-        if (isClientCellBlockSupport(call) && controller instanceof PayloadCarryingRpcController
+        if (isClientCellBlockSupport(call) && controller instanceof HBaseRpcController
             && VersionInfoUtil.hasMinimumVersion(call.getClientVersionInfo(), 1, 3)) {
           pbr = ProtobufUtil.toResultNoData(r);
-          ((PayloadCarryingRpcController) controller)
+          ((HBaseRpcController) controller)
               .setCellScanner(CellUtil.createCellScanner(r.rawCells()));
           addSize(call, r, null);
         } else {
@@ -2175,7 +2175,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
     // It is also the conduit via which we pass back data.
-    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
+    HBaseRpcController controller = (HBaseRpcController)rpcc;
     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
     if (controller != null) {
       controller.setCellScanner(null);
@@ -2305,7 +2305,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       final MutateRequest request) throws ServiceException {
     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
     // It is also the conduit via which we pass back data.
-    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
+    HBaseRpcController controller = (HBaseRpcController)rpcc;
     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
     OperationQuota quota = null;
     RpcCallContext context = RpcServer.getCurrentCall();
@@ -2530,7 +2530,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  private long getTimeLimit(PayloadCarryingRpcController controller,
+  private long getTimeLimit(HBaseRpcController controller,
       boolean allowHeartbeatMessages) {
     // Set the time limit to be half of the more restrictive timeout value (one of the
     // timeout values must be positive). In the event that both values are positive, the
@@ -2559,7 +2559,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   // return whether we have more results in region.
-  private boolean scan(PayloadCarryingRpcController controller, ScanRequest request,
+  private boolean scan(HBaseRpcController controller, ScanRequest request,
       RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, List<Result> results,
       ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context)
       throws IOException {
@@ -2714,9 +2714,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   @Override
   public ScanResponse scan(final RpcController controller, final ScanRequest request)
       throws ServiceException {
-    if (controller != null && !(controller instanceof PayloadCarryingRpcController)) {
+    if (controller != null && !(controller instanceof HBaseRpcController)) {
       throw new UnsupportedOperationException(
-          "We only do PayloadCarryingRpcControllers! FIX IF A PROBLEM: " + controller);
+          "We only do HBaseRpcController! FIX IF A PROBLEM: " + controller);
     }
     if (!request.hasScannerId() && !request.hasScan()) {
       throw new ServiceException(
@@ -2839,7 +2839,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
         }
         if (!done) {
-          moreResultsInRegion = scan((PayloadCarryingRpcController) controller, request, rsh,
+          moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh,
             maxQuotaResultSize, rows, results, builder, lastBlock, context);
         }
       }
@@ -2858,7 +2858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         // if we have reached the limit of rows
         moreResults = false;
       }
-      addResults(builder, results, (PayloadCarryingRpcController) controller,
+      addResults(builder, results, (HBaseRpcController) controller,
         RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
       if (!moreResults || !moreResultsInRegion || closeScanner) {
         scannerClosed = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 1314a4d..5f6fd45 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -27,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -35,10 +36,11 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
@@ -49,8 +51,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * This class is responsible for replaying the edits coming from a failed region server.
  * <p>
@@ -214,7 +214,7 @@ public class WALEditsReplaySink {
 
       Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
           ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
+      HBaseRpcController controller = rpcControllerFactory.newController(p.getSecond());
       try {
         remoteSvr.replay(controller, p.getFirst());
       } catch (ServiceException se) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index 0697013..235e27a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -34,7 +39,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
@@ -45,29 +49,22 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RetryingCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
-import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
 import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -76,13 +73,14 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.util.StringUtils;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
-
 /**
  * A {@link ReplicationEndpoint} endpoint which receives the WAL edits from the
  * WAL, and sends the edits to replicas of regions.
@@ -658,7 +656,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
             ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray,
               location.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
         try {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
+          HBaseRpcController controller = rpcControllerFactory.newController(p.getSecond());
           controller.setCallTimeout(timeout);
           controller.setPriority(tableName);
           return stub.replay(controller, p.getFirst());

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
index 8b0fa70..644a70d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Locale;
+import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -33,14 +34,14 @@ import javax.security.sasl.RealmCallback;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
 
 /**
  * A utility class for dealing with SASL on RPC server
@@ -49,11 +50,17 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 public class HBaseSaslRpcServer {
   private static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
 
+  private static Map<String, String> saslProps = null;
+
   public static void init(Configuration conf) {
-    SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 
+    saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
           QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
   }
 
+  public static Map<String, String> getSaslProps() {
+    return saslProps;
+  }
+
   public static <T extends TokenIdentifier> T getIdentifier(String id,
       SecretManager<T> secretManager) throws InvalidToken {
     byte[] tokenId = SaslUtil.decodeIdentifier(id);

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java
index c537fe0..96ac5af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java
@@ -22,6 +22,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,7 +35,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
@@ -47,9 +50,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 /**
  * Test MetaTableAccessor but without spinning up a cluster.
  * We mock regionserver back and forth (we do spin up a zk cluster).
@@ -163,7 +163,7 @@ public class TestMetaTableAccessorNoCluster {
           .thenThrow(new ServiceException("Server not running (3 of 3)"))
           .thenAnswer(new Answer<ScanResponse>() {
             public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
-              ((PayloadCarryingRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil
+              ((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil
                   .createCellScanner(cellScannables));
               return builder.setScannerId(1234567890L).setMoreResults(false).build();
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
index de80a7b..350dd84 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java
@@ -19,9 +19,12 @@
 package org.apache.hadoop.hbase;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 
 import java.io.IOException;
 import java.net.ConnectException;
@@ -30,8 +33,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.RegionState;
@@ -53,9 +57,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 /**
  * Test {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}
  */
@@ -253,7 +254,7 @@ public class TestMetaTableLocator {
       thenReturn(implementation);
         RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
         Mockito.when(controllerFactory.newController()).thenReturn(
-          Mockito.mock(PayloadCarryingRpcController.class));
+          Mockito.mock(HBaseRpcController.class));
         Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory);
 
     ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index a475e5a..711f520 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -22,40 +22,38 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
-@Category(MediumTests.class)
+@Category({MediumTests.class, ClientTests.class})
 public class TestClientTimeouts {
-  private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   protected static int SLAVES = 1;
 
@@ -86,7 +84,6 @@ public class TestClientTimeouts {
    */
   @Test
   public void testAdminTimeout() throws Exception {
-    Connection lastConnection = null;
     boolean lastFailed = false;
     int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
     RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
@@ -98,12 +95,11 @@ public class TestClientTimeouts {
         // Ensure the HBaseAdmin uses a new connection by changing Configuration.
         Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
         conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
-        HBaseAdmin admin = null;
+        Admin admin = null;
+        Connection connection = null;
         try {
-          admin = new HBaseAdmin(conf);
-          Connection connection = admin.getConnection();
-          assertFalse(connection == lastConnection);
-          lastConnection = connection;
+          connection = ConnectionFactory.createConnection(conf);
+          admin = connection.getAdmin();
           // run some admin commands
           HBaseAdmin.checkHBaseAvailable(conf);
           admin.setBalancerRunning(false, false);
@@ -112,10 +108,15 @@ public class TestClientTimeouts {
           // a MasterNotRunningException.  It's a bug if we get other exceptions.
           lastFailed = true;
         } finally {
-          admin.close();
-          if (admin.getConnection().isClosed()) {
-            rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
-                .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
+          if(admin != null) {
+            admin.close();
+            if (admin.getConnection().isClosed()) {
+              rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
+                  .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
+            }
+          }
+          if(connection != null) {
+            connection.close();
           }
         }
       }
@@ -130,7 +131,7 @@ public class TestClientTimeouts {
   /**
    * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
    */
-  public static class RandomTimeoutRpcClient extends RpcClientImpl {
+  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
     public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
         MetricsConnection metrics) {
       super(conf, clusterId, localAddr, metrics);
@@ -153,9 +154,9 @@ public class TestClientTimeouts {
     public static final double CHANCE_OF_TIMEOUT = 0.3;
     private static AtomicInteger invokations = new AtomicInteger();
 
-    RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn,
-        final User ticket, final int rpcTimeout) throws UnknownHostException {
-      super(rpcClient, sn, ticket, rpcTimeout);
+    RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn,
+        final User ticket, final int rpcTimeout) {
+      super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
     }
 
     @Override
@@ -172,4 +173,4 @@ public class TestClientTimeouts {
       return super.callBlockingMethod(md, controller, param, returnType);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java
index b788e35..ad406b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java
@@ -21,6 +21,9 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.util.ArrayList;
 
@@ -31,12 +34,11 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -57,9 +60,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
 @Category(SmallTests.class)
 public class TestHBaseAdminNoCluster {
 
@@ -314,7 +314,7 @@ public class TestHBaseAdminNoCluster {
     RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class);
     Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory);
     Mockito.when(rpcControllerFactory.newController()).thenReturn(
-      Mockito.mock(PayloadCarryingRpcController.class));
+      Mockito.mock(HBaseRpcController.class));
 
     // we need a real retrying caller
     RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration);

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index f468c16..6622ae9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -18,6 +18,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import com.google.common.collect.Lists;
 
 import java.io.IOException;
@@ -61,7 +68,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.ipc.CallTimeoutException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
@@ -86,13 +92,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 /**
  * This class is for testing HBaseConnectionManager features
  */
@@ -726,8 +725,11 @@ public class TestHCM {
     c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
     c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
     c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
-
-    final HTable table = new HTable(c2, tableName);
+    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
+    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000);
+    ConnectionManager.HConnectionImplementation conn =
+        (ConnectionManager.HConnectionImplementation) ConnectionManager.createConnection(c2);
+    final HTable table = (HTable) conn.getTable(tableName);
 
     Put put = new Put(ROW);
     put.add(FAM_NAM, ROW, ROW);
@@ -749,6 +751,7 @@ public class TestHCM {
             done++;
             if (done % 100 == 0)
               LOG.info("done=" + done);
+            Thread.sleep(100);
           }
         } catch (Throwable t) {
           failed.set(t);
@@ -766,8 +769,6 @@ public class TestHCM {
     });
 
     ServerName sn = table.getRegionLocation(ROW).getServerName();
-    ConnectionManager.HConnectionImplementation conn =
-        (ConnectionManager.HConnectionImplementation) table.getConnection();
     RpcClient rpcClient = conn.getRpcClient();
 
     LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index eb989d2..1d49460 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -30,21 +32,19 @@ import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
-import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Lists;
-
 @Category(MediumTests.class)
 public class TestRpcControllerFactory {
 
@@ -55,27 +55,27 @@ public class TestRpcControllerFactory {
     }
 
     @Override
-    public PayloadCarryingRpcController newController() {
+    public HBaseRpcController newController() {
       return new CountingRpcController(super.newController());
     }
 
     @Override
-    public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
+    public HBaseRpcController newController(final CellScanner cellScanner) {
       return new CountingRpcController(super.newController(cellScanner));
     }
 
     @Override
-    public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
+    public HBaseRpcController newController(final List<CellScannable> cellIterables) {
       return new CountingRpcController(super.newController(cellIterables));
     }
   }
 
-  public static class CountingRpcController extends DelegatingPayloadCarryingRpcController {
+  public static class CountingRpcController extends DelegatingHBaseRpcController {
 
     private static AtomicInteger INT_PRIORITY = new AtomicInteger();
     private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
 
-    public CountingRpcController(PayloadCarryingRpcController delegate) {
+    public CountingRpcController(HBaseRpcController delegate) {
       super(delegate);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
index cdda28a..da12683 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java
@@ -23,19 +23,24 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.util.Threads;
 
 import java.io.IOException;
 
 /**
  * Test implementation of a coprocessor endpoint exposing the
- * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods.  For internal use by
- * unit tests only.
+ * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests
+ * only.
  */
-public class ProtobufCoprocessorService
-    extends TestRpcServiceProtos.TestProtobufRpcProto
+public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto
     implements CoprocessorService, Coprocessor {
   public ProtobufCoprocessorService() {
   }
@@ -47,31 +52,46 @@ public class ProtobufCoprocessorService
 
   @Override
   public void ping(RpcController controller, TestProtos.EmptyRequestProto request,
-                   RpcCallback<TestProtos.EmptyResponseProto> done) {
+      RpcCallback<TestProtos.EmptyResponseProto> done) {
     done.run(TestProtos.EmptyResponseProto.getDefaultInstance());
   }
 
   @Override
   public void echo(RpcController controller, TestProtos.EchoRequestProto request,
-                   RpcCallback<TestProtos.EchoResponseProto> done) {
+      RpcCallback<TestProtos.EchoResponseProto> done) {
     String message = request.getMessage();
     done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build());
   }
 
   @Override
   public void error(RpcController controller, TestProtos.EmptyRequestProto request,
-                    RpcCallback<TestProtos.EmptyResponseProto> done) {
+      RpcCallback<TestProtos.EmptyResponseProto> done) {
     ResponseConverter.setControllerException(controller, new IOException("Test exception"));
     done.run(null);
   }
 
   @Override
+  public void pause(RpcController controller, PauseRequestProto request,
+      RpcCallback<EmptyResponseProto> done) {
+    Threads.sleep(request.getMs());
+    done.run(EmptyResponseProto.getDefaultInstance());
+  }
+
+  @Override
+  public void addr(RpcController controller, EmptyRequestProto request,
+      RpcCallback<AddrResponseProto> done) {
+    done.run(AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress())
+        .build());
+  }
+
+  @Override
   public void start(CoprocessorEnvironment env) throws IOException {
-    //To change body of implemented methods use File | Settings | File Templates.
+    // To change body of implemented methods use File | Settings | File Templates.
   }
 
   @Override
   public void stop(CoprocessorEnvironment env) throws IOException {
-    //To change body of implemented methods use File | Settings | File Templates.
+    // To change body of implemented methods use File | Settings | File Templates.
   }
+
 }


Mime
View raw message