hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [2/3] hbase git commit: HBASE-18012 Move RpcServer.Connection to a separated file
Date Mon, 15 May 2017 10:08:17 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
new file mode 100644
index 0000000..d4ab95c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -0,0 +1,852 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.crypto.cipher.CryptoCipherFactory;
+import org.apache.commons.crypto.random.CryptoRandom;
+import org.apache.commons.crypto.random.CryptoRandomFactory;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
+import org.apache.hadoop.hbase.security.SaslStatus;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.htrace.TraceInfo;
+
+/** Reads calls from a connection and queues them for handling. */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+    value="VO_VOLATILE_INCREMENT",
+    justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
+abstract class ServerRpcConnection implements Closeable {
+  /**  */
+  protected final RpcServer rpcServer;
+  // If initial preamble with version and magic has been read or not.
+  protected boolean connectionPreambleRead = false;
+  // If the connection header has been read or not.
+  protected boolean connectionHeaderRead = false;
+
+  protected CallCleanup callCleanup;
+
+  // Cache the remote host & port info so that even if the socket is
+  // disconnected, we can say where it used to connect to.
+  protected String hostAddress;
+  protected int remotePort;
+  protected InetAddress addr;
+  protected ConnectionHeader connectionHeader;
+
+  /**
+   * Codec the client asked use.
+   */
+  protected Codec codec;
+  /**
+   * Compression codec the client asked us use.
+   */
+  protected CompressionCodec compressionCodec;
+  protected BlockingService service;
+
+  protected AuthMethod authMethod;
+  protected boolean saslContextEstablished;
+  protected boolean skipInitialSaslHandshake;
+  private ByteBuffer unwrappedData;
+  // When is this set? FindBugs wants to know! Says NP
+  private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
+  protected boolean useSasl;
+  protected SaslServer saslServer;
+  protected CryptoAES cryptoAES;
+  protected boolean useWrap = false;
+  protected boolean useCryptoAesWrap = false;
+  // Fake 'call' for failed authorization response
+  protected static final int AUTHORIZATION_FAILED_CALLID = -1;
+  protected ServerCall<?> authFailedCall;
+  protected ByteArrayOutputStream authFailedResponse =
+      new ByteArrayOutputStream();
+  // Fake 'call' for SASL context setup
+  protected static final int SASL_CALLID = -33;
+  protected ServerCall<?> saslCall;
+  // Fake 'call' for connection header response
+  protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
+  protected ServerCall<?> setConnectionHeaderResponseCall;
+
+  // was authentication allowed with a fallback to simple auth
+  protected boolean authenticatedWithFallback;
+
+  protected boolean retryImmediatelySupported = false;
+
+  private UserGroupInformation attemptingUser = null; // user name before auth
+  protected User user = null;
+  protected UserGroupInformation ugi = null;
+
+  public ServerRpcConnection(RpcServer rpcServer) {
+    this.rpcServer = rpcServer;
+    this.callCleanup = null;
+  }
+
+  @Override
+  public String toString() {
+    return getHostAddress() + ":" + remotePort;
+  }
+
+  public String getHostAddress() {
+    return hostAddress;
+  }
+
+  public InetAddress getHostInetAddress() {
+    return addr;
+  }
+
+  public int getRemotePort() {
+    return remotePort;
+  }
+
+  public VersionInfo getVersionInfo() {
+    if (connectionHeader.hasVersionInfo()) {
+      return connectionHeader.getVersionInfo();
+    }
+    return null;
+  }
+
+  protected String getFatalConnectionString(final int version, final byte authByte) {
+    return "serverVersion=" + RpcServer.CURRENT_VERSION +
+    ", clientVersion=" + version + ", authMethod=" + authByte +
+    ", authSupported=" + (authMethod != null) + " from " + toString();
+  }
+
+  protected UserGroupInformation getAuthorizedUgi(String authorizedId)
+      throws IOException {
+    UserGroupInformation authorizedUgi;
+    if (authMethod == AuthMethod.DIGEST) {
+      TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
+          this.rpcServer.secretManager);
+      authorizedUgi = tokenId.getUser();
+      if (authorizedUgi == null) {
+        throw new AccessDeniedException(
+            "Can't retrieve username from tokenIdentifier.");
+      }
+      authorizedUgi.addTokenIdentifier(tokenId);
+    } else {
+      authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
+    }
+    authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
+    return authorizedUgi;
+  }
+
+  /**
+   * Set up cell block codecs
+   * @throws FatalConnectionException
+   */
+  protected void setupCellBlockCodecs(final ConnectionHeader header)
+      throws FatalConnectionException {
+    // TODO: Plug in other supported decoders.
+    if (!header.hasCellBlockCodecClass()) return;
+    String className = header.getCellBlockCodecClass();
+    if (className == null || className.length() == 0) return;
+    try {
+      this.codec = (Codec)Class.forName(className).newInstance();
+    } catch (Exception e) {
+      throw new UnsupportedCellCodecException(className, e);
+    }
+    if (!header.hasCellBlockCompressorClass()) return;
+    className = header.getCellBlockCompressorClass();
+    try {
+      this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
+    } catch (Exception e) {
+      throw new UnsupportedCompressionCodecException(className, e);
+    }
+  }
+
+  /**
+   * Set up cipher for rpc encryption with Apache Commons Crypto
+   *
+   * @throws FatalConnectionException
+   */
+  protected void setupCryptoCipher(final ConnectionHeader header,
+      RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
+      throws FatalConnectionException {
+    // If simple auth, return
+    if (saslServer == null) return;
+    // check if rpc encryption with Crypto AES
+    String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+    boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
+        .getSaslQop().equalsIgnoreCase(qop);
+    boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean(
+        "hbase.rpc.crypto.encryption.aes.enabled", false);
+    if (!isCryptoAesEncryption) return;
+    if (!header.hasRpcCryptoCipherTransformation()) return;
+    String transformation = header.getRpcCryptoCipherTransformation();
+    if (transformation == null || transformation.length() == 0) return;
+     // Negotiates AES based on complete saslServer.
+     // The Crypto metadata need to be encrypted and send to client.
+    Properties properties = new Properties();
+    // the property for SecureRandomFactory
+    properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
+        this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
+            "org.apache.commons.crypto.random.JavaCryptoRandom"));
+    // the property for cipher class
+    properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
+        this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
+            "org.apache.commons.crypto.cipher.JceCipher"));
+
+    int cipherKeyBits = this.rpcServer.conf.getInt(
+        "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
+    // generate key and iv
+    if (cipherKeyBits % 8 != 0) {
+      throw new IllegalArgumentException("The AES cipher key size in bits" +
+          " should be a multiple of byte");
+    }
+    int len = cipherKeyBits / 8;
+    byte[] inKey = new byte[len];
+    byte[] outKey = new byte[len];
+    byte[] inIv = new byte[len];
+    byte[] outIv = new byte[len];
+
+    try {
+      // generate the cipher meta data with SecureRandom
+      CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
+      secureRandom.nextBytes(inKey);
+      secureRandom.nextBytes(outKey);
+      secureRandom.nextBytes(inIv);
+      secureRandom.nextBytes(outIv);
+
+      // create CryptoAES for server
+      cryptoAES = new CryptoAES(transformation, properties,
+          inKey, outKey, inIv, outIv);
+      // create SaslCipherMeta and send to client,
+      //  for client, the [inKey, outKey], [inIv, outIv] should be reversed
+      RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
+      ccmBuilder.setTransformation(transformation);
+      ccmBuilder.setInIv(getByteString(outIv));
+      ccmBuilder.setInKey(getByteString(outKey));
+      ccmBuilder.setOutIv(getByteString(inIv));
+      ccmBuilder.setOutKey(getByteString(inKey));
+      chrBuilder.setCryptoCipherMeta(ccmBuilder);
+      useCryptoAesWrap = true;
+    } catch (GeneralSecurityException | IOException ex) {
+      throw new UnsupportedCryptoException(ex.getMessage(), ex);
+    }
+  }
+
+  private ByteString getByteString(byte[] bytes) {
+    // return singleton to reduce object allocation
+    return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
+  }
+
+  protected UserGroupInformation createUser(ConnectionHeader head) {
+    UserGroupInformation ugi = null;
+
+    if (!head.hasUserInfo()) {
+      return null;
+    }
+    UserInformation userInfoProto = head.getUserInfo();
+    String effectiveUser = null;
+    if (userInfoProto.hasEffectiveUser()) {
+      effectiveUser = userInfoProto.getEffectiveUser();
+    }
+    String realUser = null;
+    if (userInfoProto.hasRealUser()) {
+      realUser = userInfoProto.getRealUser();
+    }
+    if (effectiveUser != null) {
+      if (realUser != null) {
+        UserGroupInformation realUserUgi =
+            UserGroupInformation.createRemoteUser(realUser);
+        ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
+      } else {
+        ugi = UserGroupInformation.createRemoteUser(effectiveUser);
+      }
+    }
+    return ugi;
+  }
+
+  protected void disposeSasl() {
+    if (saslServer != null) {
+      try {
+        saslServer.dispose();
+        saslServer = null;
+      } catch (SaslException ignored) {
+        // Ignored. This is being disposed of anyway.
+      }
+    }
+  }
+
+  /**
+   * No protobuf encoding of raw sasl messages
+   */
+  protected void doRawSaslReply(SaslStatus status, Writable rv,
+      String errorClass, String error) throws IOException {
+    ByteBufferOutputStream saslResponse = null;
+    DataOutputStream out = null;
+    try {
+      // In my testing, have noticed that sasl messages are usually
+      // in the ballpark of 100-200. That's why the initial capacity is 256.
+      saslResponse = new ByteBufferOutputStream(256);
+      out = new DataOutputStream(saslResponse);
+      out.writeInt(status.state); // write status
+      if (status == SaslStatus.SUCCESS) {
+        rv.write(out);
+      } else {
+        WritableUtils.writeString(out, errorClass);
+        WritableUtils.writeString(out, error);
+      }
+      saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
+      saslCall.sendResponseIfReady();
+    } finally {
+      if (saslResponse != null) {
+        saslResponse.close();
+      }
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
+      InterruptedException {
+    if (saslContextEstablished) {
+      if (RpcServer.LOG.isTraceEnabled())
+        RpcServer.LOG.trace("Have read input token of size " + saslToken.limit()
+            + " for processing by saslServer.unwrap()");
+
+      if (!useWrap) {
+        processOneRpc(saslToken);
+      } else {
+        byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
+        byte [] plaintextData;
+        if (useCryptoAesWrap) {
+          // unwrap with CryptoAES
+          plaintextData = cryptoAES.unwrap(b, 0, b.length);
+        } else {
+          plaintextData = saslServer.unwrap(b, 0, b.length);
+        }
+        processUnwrappedData(plaintextData);
+      }
+    } else {
+      byte[] replyToken;
+      try {
+        if (saslServer == null) {
+          switch (authMethod) {
+          case DIGEST:
+            if (this.rpcServer.secretManager == null) {
+              throw new AccessDeniedException(
+                  "Server is not configured to do DIGEST authentication.");
+            }
+            saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
+                .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
+                HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
+                    this.rpcServer.secretManager, ugi ->  attemptingUser = ugi));
+            break;
+          default:
+            UserGroupInformation current = UserGroupInformation.getCurrentUser();
+            String fullName = current.getUserName();
+            if (RpcServer.LOG.isDebugEnabled()) {
+              RpcServer.LOG.debug("Kerberos principal name is " + fullName);
+            }
+            final String names[] = SaslUtil.splitKerberosName(fullName);
+            if (names.length != 3) {
+              throw new AccessDeniedException(
+                  "Kerberos principal name does NOT have the expected "
+                      + "hostname part: " + fullName);
+            }
+            current.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws SaslException {
+                saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
+                    .getMechanismName(), names[0], names[1],
+                    HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
+                return null;
+              }
+            });
+          }
+          if (saslServer == null)
+            throw new AccessDeniedException(
+                "Unable to find SASL server implementation for "
+                    + authMethod.getMechanismName());
+          if (RpcServer.LOG.isDebugEnabled()) {
+            RpcServer.LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
+          }
+        }
+        if (RpcServer.LOG.isDebugEnabled()) {
+          RpcServer.LOG.debug("Have read input token of size " + saslToken.limit()
+              + " for processing by saslServer.evaluateResponse()");
+        }
+        replyToken = saslServer
+            .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
+      } catch (IOException e) {
+        IOException sendToClient = e;
+        Throwable cause = e;
+        while (cause != null) {
+          if (cause instanceof InvalidToken) {
+            sendToClient = (InvalidToken) cause;
+            break;
+          }
+          cause = cause.getCause();
+        }
+        doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
+          sendToClient.getLocalizedMessage());
+        this.rpcServer.metrics.authenticationFailure();
+        String clientIP = this.toString();
+        // attempting user could be null
+        RpcServer.AUDITLOG.warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
+        throw e;
+      }
+      if (replyToken != null) {
+        if (RpcServer.LOG.isDebugEnabled()) {
+          RpcServer.LOG.debug("Will send token of size " + replyToken.length
+              + " from saslServer.");
+        }
+        doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
+            null);
+      }
+      if (saslServer.isComplete()) {
+        String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+        useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
+        ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
+        if (RpcServer.LOG.isDebugEnabled()) {
+          RpcServer.LOG.debug("SASL server context established. Authenticated client: "
+            + ugi + ". Negotiated QoP is "
+            + saslServer.getNegotiatedProperty(Sasl.QOP));
+        }
+        this.rpcServer.metrics.authenticationSuccess();
+        RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
+        saslContextEstablished = true;
+      }
+    }
+  }
+
+  private void processUnwrappedData(byte[] inBuf) throws IOException,
+  InterruptedException {
+    ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
+    // Read all RPCs contained in the inBuf, even partial ones
+    while (true) {
+      int count;
+      if (unwrappedDataLengthBuffer.remaining() > 0) {
+        count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
+        if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
+          return;
+      }
+
+      if (unwrappedData == null) {
+        unwrappedDataLengthBuffer.flip();
+        int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
+
+        if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
+          if (RpcServer.LOG.isDebugEnabled())
+            RpcServer.LOG.debug("Received ping message");
+          unwrappedDataLengthBuffer.clear();
+          continue; // ping message
+        }
+        unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
+      }
+
+      count = this.rpcServer.channelRead(ch, unwrappedData);
+      if (count <= 0 || unwrappedData.remaining() > 0)
+        return;
+
+      if (unwrappedData.remaining() == 0) {
+        unwrappedDataLengthBuffer.clear();
+        unwrappedData.flip();
+        processOneRpc(new SingleByteBuff(unwrappedData));
+        unwrappedData = null;
+      }
+    }
+  }
+
+  public void processOneRpc(ByteBuff buf) throws IOException,
+      InterruptedException {
+    if (connectionHeaderRead) {
+      processRequest(buf);
+    } else {
+      processConnectionHeader(buf);
+      this.connectionHeaderRead = true;
+      if (!authorizeConnection()) {
+        // Throw FatalConnectionException wrapping ACE so client does right thing and closes
+        // down the connection instead of trying to read non-existent retun.
+        throw new AccessDeniedException("Connection from " + this + " for service " +
+          connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
+      }
+      this.user = this.rpcServer.userProvider.create(this.ugi);
+    }
+  }
+
+  protected boolean authorizeConnection() throws IOException {
+    try {
+      // If auth method is DIGEST, the token was obtained by the
+      // real user for the effective user, therefore not required to
+      // authorize real user. doAs is allowed only for simple or kerberos
+      // authentication
+      if (ugi != null && ugi.getRealUser() != null
+          && (authMethod != AuthMethod.DIGEST)) {
+        ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
+      }
+      this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
+      this.rpcServer.metrics.authorizationSuccess();
+    } catch (AuthorizationException ae) {
+      if (RpcServer.LOG.isDebugEnabled()) {
+        RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
+      }
+      this.rpcServer.metrics.authorizationFailure();
+      this.rpcServer.setupResponse(authFailedResponse, authFailedCall,
+        new AccessDeniedException(ae), ae.getMessage());
+      authFailedCall.sendResponseIfReady();
+      return false;
+    }
+    return true;
+  }
+
+  // Reads the connection header following version
+  protected void processConnectionHeader(ByteBuff buf) throws IOException {
+    if (buf.hasArray()) {
+      this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
+    } else {
+      CodedInputStream cis = UnsafeByteOperations
+          .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
+      cis.enableAliasing(true);
+      this.connectionHeader = ConnectionHeader.parseFrom(cis);
+    }
+    String serviceName = connectionHeader.getServiceName();
+    if (serviceName == null) throw new EmptyServiceNameException();
+    this.service = RpcServer.getService(this.rpcServer.services, serviceName);
+    if (this.service == null) throw new UnknownServiceException(serviceName);
+    setupCellBlockCodecs(this.connectionHeader);
+    RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
+        RPCProtos.ConnectionHeaderResponse.newBuilder();
+    setupCryptoCipher(this.connectionHeader, chrBuilder);
+    responseConnectionHeader(chrBuilder);
+    UserGroupInformation protocolUser = createUser(connectionHeader);
+    if (!useSasl) {
+      ugi = protocolUser;
+      if (ugi != null) {
+        ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+      }
+      // audit logging for SASL authenticated users happens in saslReadAndProcess()
+      if (authenticatedWithFallback) {
+        RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
+            + " connecting from " + getHostAddress());
+      }
+      RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
+    } else {
+      // user is authenticated
+      ugi.setAuthenticationMethod(authMethod.authenticationMethod);
+      //Now we check if this is a proxy user case. If the protocol user is
+      //different from the 'user', it is a proxy user scenario. However,
+      //this is not allowed if user authenticated with DIGEST.
+      if ((protocolUser != null)
+          && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
+        if (authMethod == AuthMethod.DIGEST) {
+          // Not allowed to doAs if token authentication is used
+          throw new AccessDeniedException("Authenticated user (" + ugi
+              + ") doesn't match what the client claims to be ("
+              + protocolUser + ")");
+        } else {
+          // Effective user can be different from authenticated user
+          // for simple auth or kerberos auth
+          // The user is the real user. Now we create a proxy user
+          UserGroupInformation realUser = ugi;
+          ugi = UserGroupInformation.createProxyUser(protocolUser
+              .getUserName(), realUser);
+          // Now the user is a proxy user, set Authentication method Proxy.
+          ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
+        }
+      }
+    }
+    if (connectionHeader.hasVersionInfo()) {
+      // see if this connection will support RetryImmediatelyException
+      retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
+
+      RpcServer.AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+          + " with version info: "
+          + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
+    } else {
+      RpcServer.AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+          + " with unknown version info");
+    }
+  }
+
+  private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
+      throws FatalConnectionException {
+    // Response the connection header if Crypto AES is enabled
+    if (!chrBuilder.hasCryptoCipherMeta()) return;
+    try {
+      byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
+      // encrypt the Crypto AES cipher meta data with sasl server, and send to client
+      byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
+      Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
+      Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
+
+      doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
+    } catch (IOException ex) {
+      throw new UnsupportedCryptoException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Send the response for connection header
+   */
+  private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData)
+      throws IOException {
+    ByteBufferOutputStream response = null;
+    DataOutputStream out = null;
+    try {
+      response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
+      out = new DataOutputStream(response);
+      out.writeInt(wrappedCipherMetaData.length);
+      out.write(wrappedCipherMetaData);
+
+      setConnectionHeaderResponseCall.setConnectionHeaderResponse(response
+          .getByteBuffer());
+      setConnectionHeaderResponseCall.sendResponseIfReady();
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+      if (response != null) {
+        response.close();
+      }
+    }
+  }
+
+  /**
+   * @param buf
+   *          Has the request header and the request param and optionally
+   *          encoded data buffer all in this one array.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected void processRequest(ByteBuff buf) throws IOException,
+      InterruptedException {
+    long totalRequestSize = buf.limit();
+    int offset = 0;
+    // Here we read in the header. We avoid having pb
+    // do its default 4k allocation for CodedInputStream. We force it to use
+    // backing array.
+    CodedInputStream cis;
+    if (buf.hasArray()) {
+      cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput();
+    } else {
+      cis = UnsafeByteOperations
+          .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
+    }
+    cis.enableAliasing(true);
+    int headerSize = cis.readRawVarint32();
+    offset = cis.getTotalBytesRead();
+    Message.Builder builder = RequestHeader.newBuilder();
+    ProtobufUtil.mergeFrom(builder, cis, headerSize);
+    RequestHeader header = (RequestHeader) builder.build();
+    offset += headerSize;
+    int id = header.getCallId();
+    if (RpcServer.LOG.isTraceEnabled()) {
+      RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
+          + " totalRequestSize: " + totalRequestSize + " bytes");
+    }
+    // Enforcing the call queue size, this triggers a retry in the client
+    // This is a bit late to be doing this check - we have already read in the
+    // total request.
+    if ((totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
+      final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
+        totalRequestSize, null, null, 0, this.callCleanup);
+      ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
+      this.rpcServer.setupResponse(responseBuffer, callTooBig, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+          "Call queue is full on " + this.rpcServer.server.getServerName()
+              + ", is hbase.ipc.server.max.callqueue.size too small?");
+      callTooBig.sendResponseIfReady();
+      return;
+    }
+    MethodDescriptor md = null;
+    Message param = null;
+    CellScanner cellScanner = null;
+    try {
+      if (header.hasRequestParam() && header.getRequestParam()) {
+        md = this.service.getDescriptorForType().findMethodByName(
+            header.getMethodName());
+        if (md == null)
+          throw new UnsupportedOperationException(header.getMethodName());
+        builder = this.service.getRequestPrototype(md).newBuilderForType();
+        cis.resetSizeCounter();
+        int paramSize = cis.readRawVarint32();
+        offset += cis.getTotalBytesRead();
+        if (builder != null) {
+          ProtobufUtil.mergeFrom(builder, cis, paramSize);
+          param = builder.build();
+        }
+        offset += paramSize;
+      } else {
+        // currently header must have request param, so we directly throw
+        // exception here
+        String msg = "Invalid request header: "
+            + TextFormat.shortDebugString(header)
+            + ", should have param set in it";
+        RpcServer.LOG.warn(msg);
+        throw new DoNotRetryIOException(msg);
+      }
+      if (header.hasCellBlockMeta()) {
+        buf.position(offset);
+        ByteBuff dup = buf.duplicate();
+        dup.limit(offset + header.getCellBlockMeta().getLength());
+        cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(
+            this.codec, this.compressionCodec, dup);
+      }
+    } catch (Throwable t) {
+      InetSocketAddress address = this.rpcServer.getListenerAddress();
+      String msg = (address != null ? address : "(channel closed)")
+          + " is unable to read call parameter from client "
+          + getHostAddress();
+      RpcServer.LOG.warn(msg, t);
+
+      this.rpcServer.metrics.exception(t);
+
+      // probably the hbase hadoop version does not match the running hadoop
+      // version
+      if (t instanceof LinkageError) {
+        t = new DoNotRetryIOException(t);
+      }
+      // If the method is not present on the server, do not retry.
+      if (t instanceof UnsupportedOperationException) {
+        t = new DoNotRetryIOException(t);
+      }
+
+      final ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
+        totalRequestSize, null, null, 0, this.callCleanup);
+      ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+      this.rpcServer.setupResponse(responseBuffer, readParamsFailedCall, t,
+          msg + "; " + t.getMessage());
+      readParamsFailedCall.sendResponseIfReady();
+      return;
+    }
+
+    TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header
+        .getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
+        : null;
+    int timeout = 0;
+    if (header.hasTimeout() && header.getTimeout() > 0) {
+      timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
+    }
+    ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize,
+      traceInfo, this.addr, timeout, this.callCleanup);
+
+    if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
+      this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
+
+      ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+      this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
+      this.rpcServer.setupResponse(responseBuffer, call, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+          "Call queue is full on " + this.rpcServer.server.getServerName()
+              + ", too many items queued ?");
+      call.sendResponseIfReady();
+    }
+  }
+
+  public abstract boolean isConnectionOpen();
+
+  public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
+      RequestHeader header, Message param, CellScanner cellScanner, long size, TraceInfo tinfo,
+      InetAddress remoteAddress, int timeout, CallCleanup reqCleanup);
+
+  private static class ByteBuffByteInput extends ByteInput {
+
+    private ByteBuff buf;
+    private int offset;
+    private int length;
+
+    ByteBuffByteInput(ByteBuff buf, int offset, int length) {
+      this.buf = buf;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    @Override
+    public byte read(int offset) {
+      return this.buf.get(getAbsoluteOffset(offset));
+    }
+
+    private int getAbsoluteOffset(int offset) {
+      return this.offset + offset;
+    }
+
+    @Override
+    public int read(int offset, byte[] out, int outOffset, int len) {
+      this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
+      return len;
+    }
+
+    @Override
+    public int read(int offset, ByteBuffer out) {
+      int len = out.remaining();
+      this.buf.get(out, getAbsoluteOffset(offset), len);
+      return len;
+    }
+
+    @Override
+    public int size() {
+      return this.length;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index 59d1ff9..481b701 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -15,29 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.BindException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.net.SocketException;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -45,47 +36,26 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.VersionInfoUtil;
-import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hadoop.hbase.security.SaslStatus;
-import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.htrace.TraceInfo;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -125,7 +95,7 @@ public class SimpleRpcServer extends RpcServer {
   // maintains the set of client connections and handles idle timeouts
   private ConnectionManager connectionManager;
   private Listener listener = null;
-  protected Responder responder = null;
+  protected SimpleRpcServerResponder responder = null;
 
   /** Listens on the socket. Creates jobs for the handler threads*/
   private class Listener extends Thread {
@@ -178,7 +148,7 @@ public class SimpleRpcServer extends RpcServer {
 
 
     private class Reader implements Runnable {
-      final private LinkedBlockingQueue<Connection> pendingConnections;
+      final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
       private final Selector readSelector;
 
       Reader() throws IOException {
@@ -206,7 +176,7 @@ public class SimpleRpcServer extends RpcServer {
             // unbridled acceptance of connections that starves the select
             int size = pendingConnections.size();
             for (int i=size; i>0; i--) {
-              Connection conn = pendingConnections.take();
+              SimpleServerRpcConnection conn = pendingConnections.take();
               conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
             }
             readSelector.select();
@@ -238,7 +208,7 @@ public class SimpleRpcServer extends RpcServer {
        * so the connection must be queued.  The reader will drain the queue
        * and update its readSelector before performing the next select
        */
-      public void addConnection(Connection conn) throws IOException {
+      public void addConnection(SimpleServerRpcConnection conn) throws IOException {
         pendingConnections.add(conn);
         readSelector.wakeup();
       }
@@ -314,7 +284,7 @@ public class SimpleRpcServer extends RpcServer {
 
     private void closeCurrentConnection(SelectionKey key, Throwable e) {
       if (key != null) {
-        Connection c = (Connection)key.attachment();
+        SimpleServerRpcConnection c = (SimpleServerRpcConnection)key.attachment();
         if (c != null) {
           closeConnection(c);
           key.attach(null);
@@ -334,7 +304,7 @@ public class SimpleRpcServer extends RpcServer {
         channel.socket().setTcpNoDelay(tcpNoDelay);
         channel.socket().setKeepAlive(tcpKeepAlive);
         Reader reader = getReader();
-        Connection c = connectionManager.register(channel);
+        SimpleServerRpcConnection c = connectionManager.register(channel);
         // If the connectionManager can't take it, close the connection.
         if (c == null) {
           if (channel.isOpen()) {
@@ -349,7 +319,7 @@ public class SimpleRpcServer extends RpcServer {
 
     void doRead(SelectionKey key) throws InterruptedException {
       int count;
-      Connection c = (Connection) key.attachment();
+      SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
       if (c == null) {
         return;
       }
@@ -396,649 +366,6 @@ public class SimpleRpcServer extends RpcServer {
     }
   }
 
-  // Sends responses of RPC back to clients.
-  protected class Responder extends Thread {
-    private final Selector writeSelector;
-    private final Set<Connection> writingCons =
-        Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
-
-    Responder() throws IOException {
-      this.setName("RpcServer.responder");
-      this.setDaemon(true);
-      this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
-      writeSelector = Selector.open(); // create a selector
-    }
-
-    @Override
-    public void run() {
-      LOG.debug(getName() + ": starting");
-      try {
-        doRunLoop();
-      } finally {
-        LOG.info(getName() + ": stopping");
-        try {
-          writeSelector.close();
-        } catch (IOException ioe) {
-          LOG.error(getName() + ": couldn't close write selector", ioe);
-        }
-      }
-    }
-
-    /**
-     * Take the list of the connections that want to write, and register them
-     * in the selector.
-     */
-    private void registerWrites() {
-      Iterator<Connection> it = writingCons.iterator();
-      while (it.hasNext()) {
-        Connection c = it.next();
-        it.remove();
-        SelectionKey sk = c.channel.keyFor(writeSelector);
-        try {
-          if (sk == null) {
-            try {
-              c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
-            } catch (ClosedChannelException e) {
-              // ignore: the client went away.
-              if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
-            }
-          } else {
-            sk.interestOps(SelectionKey.OP_WRITE);
-          }
-        } catch (CancelledKeyException e) {
-          // ignore: the client went away.
-          if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
-        }
-      }
-    }
-
-    /**
-     * Add a connection to the list that want to write,
-     */
-    public void registerForWrite(Connection c) {
-      if (writingCons.add(c)) {
-        writeSelector.wakeup();
-      }
-    }
-
-    private void doRunLoop() {
-      long lastPurgeTime = 0;   // last check for old calls.
-      while (running) {
-        try {
-          registerWrites();
-          int keyCt = writeSelector.select(purgeTimeout);
-          if (keyCt == 0) {
-            continue;
-          }
-
-          Set<SelectionKey> keys = writeSelector.selectedKeys();
-          Iterator<SelectionKey> iter = keys.iterator();
-          while (iter.hasNext()) {
-            SelectionKey key = iter.next();
-            iter.remove();
-            try {
-              if (key.isValid() && key.isWritable()) {
-                doAsyncWrite(key);
-              }
-            } catch (IOException e) {
-              LOG.debug(getName() + ": asyncWrite", e);
-            }
-          }
-
-          lastPurgeTime = purge(lastPurgeTime);
-
-        } catch (OutOfMemoryError e) {
-          if (errorHandler != null) {
-            if (errorHandler.checkOOME(e)) {
-              LOG.info(getName() + ": exiting on OutOfMemoryError");
-              return;
-            }
-          } else {
-            //
-            // we can run out of memory if we have too many threads
-            // log the event and sleep for a minute and give
-            // some thread(s) a chance to finish
-            //
-            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
-            try {
-              Thread.sleep(60000);
-            } catch (InterruptedException ex) {
-              LOG.debug("Interrupted while sleeping");
-              return;
-            }
-          }
-        } catch (Exception e) {
-          LOG.warn(getName() + ": exception in Responder " +
-              StringUtils.stringifyException(e), e);
-        }
-      }
-      LOG.info(getName() + ": stopped");
-    }
-
-    /**
-     * If there were some calls that have not been sent out for a
-     * long time, we close the connection.
-     * @return the time of the purge.
-     */
-    private long purge(long lastPurgeTime) {
-      long now = System.currentTimeMillis();
-      if (now < lastPurgeTime + purgeTimeout) {
-        return lastPurgeTime;
-      }
-
-      ArrayList<Connection> conWithOldCalls = new ArrayList<>();
-      // get the list of channels from list of keys.
-      synchronized (writeSelector.keys()) {
-        for (SelectionKey key : writeSelector.keys()) {
-          Connection connection = (Connection) key.attachment();
-          if (connection == null) {
-            throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
-          }
-          SimpleServerCall call = connection.responseQueue.peekFirst();
-          if (call != null && now > call.lastSentTime + purgeTimeout) {
-            conWithOldCalls.add(call.getConnection());
-          }
-        }
-      }
-
-      // Seems safer to close the connection outside of the synchronized loop...
-      for (Connection connection : conWithOldCalls) {
-        closeConnection(connection);
-      }
-
-      return now;
-    }
-
-    private void doAsyncWrite(SelectionKey key) throws IOException {
-      Connection connection = (Connection) key.attachment();
-      if (connection == null) {
-        throw new IOException("doAsyncWrite: no connection");
-      }
-      if (key.channel() != connection.channel) {
-        throw new IOException("doAsyncWrite: bad channel");
-      }
-
-      if (processAllResponses(connection)) {
-        try {
-          // We wrote everything, so we don't need to be told when the socket is ready for
-          //  write anymore.
-         key.interestOps(0);
-        } catch (CancelledKeyException e) {
-          /* The Listener/reader might have closed the socket.
-           * We don't explicitly cancel the key, so not sure if this will
-           * ever fire.
-           * This warning could be removed.
-           */
-          LOG.warn("Exception while changing ops : " + e);
-        }
-      }
-    }
-
-    /**
-     * Process the response for this call. You need to have the lock on
-     * {@link org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection#responseWriteLock}
-     *
-     * @param call the call
-     * @return true if we proceed the call fully, false otherwise.
-     * @throws IOException
-     */
-    private boolean processResponse(final SimpleServerCall call) throws IOException {
-      boolean error = true;
-      try {
-        // Send as much data as we can in the non-blocking fashion
-        long numBytes = channelWrite(call.getConnection().channel,
-            call.response);
-        if (numBytes < 0) {
-          throw new HBaseIOException("Error writing on the socket " +
-            "for the call:" + call.toShortString());
-        }
-        error = false;
-      } finally {
-        if (error) {
-          LOG.debug(getName() + call.toShortString() + ": output error -- closing");
-          // We will be closing this connection itself. Mark this call as done so that all the
-          // buffer(s) it got from pool can get released
-          call.done();
-          closeConnection(call.getConnection());
-        }
-      }
-
-      if (!call.response.hasRemaining()) {
-        call.done();
-        return true;
-      } else {
-        return false; // Socket can't take more, we will have to come back.
-      }
-    }
-
-    /**
-     * Process all the responses for this connection
-     *
-     * @return true if all the calls were processed or that someone else is doing it.
-     * false if there * is still some work to do. In this case, we expect the caller to
-     * delay us.
-     * @throws IOException
-     */
-    private boolean processAllResponses(final Connection connection) throws IOException {
-      // We want only one writer on the channel for a connection at a time.
-      connection.responseWriteLock.lock();
-      try {
-        for (int i = 0; i < 20; i++) {
-          // protection if some handlers manage to need all the responder
-          SimpleServerCall call = connection.responseQueue.pollFirst();
-          if (call == null) {
-            return true;
-          }
-          if (!processResponse(call)) {
-            connection.responseQueue.addFirst(call);
-            return false;
-          }
-        }
-      } finally {
-        connection.responseWriteLock.unlock();
-      }
-
-      return connection.responseQueue.isEmpty();
-    }
-
-    //
-    // Enqueue a response from the application.
-    //
-    void doRespond(SimpleServerCall call) throws IOException {
-      boolean added = false;
-
-      // If there is already a write in progress, we don't wait. This allows to free the handlers
-      //  immediately for other tasks.
-      if (call.getConnection().responseQueue.isEmpty()
-          && call.getConnection().responseWriteLock.tryLock()) {
-        try {
-          if (call.getConnection().responseQueue.isEmpty()) {
-            // If we're alone, we can try to do a direct call to the socket. It's
-            //  an optimisation to save on context switches and data transfer between cores..
-            if (processResponse(call)) {
-              return; // we're done.
-            }
-            // Too big to fit, putting ahead.
-            call.getConnection().responseQueue.addFirst(call);
-            added = true; // We will register to the selector later, outside of the lock.
-          }
-        } finally {
-          call.getConnection().responseWriteLock.unlock();
-        }
-      }
-
-      if (!added) {
-        call.getConnection().responseQueue.addLast(call);
-      }
-      call.responder.registerForWrite(call.getConnection());
-
-      // set the serve time when the response has to be sent later
-      call.lastSentTime = System.currentTimeMillis();
-    }
-  }
-
-  /** Reads calls from a connection and queues them for handling. */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-      value="VO_VOLATILE_INCREMENT",
-      justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
-  public class Connection extends RpcServer.Connection {
-
-    protected SocketChannel channel;
-    private ByteBuff data;
-    private ByteBuffer dataLengthBuffer;
-    protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue = new ConcurrentLinkedDeque<>();
-    private final Lock responseWriteLock = new ReentrantLock();
-    private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
-    private long lastContact;
-    protected Socket socket;
-
-    public Connection(SocketChannel channel, long lastContact) {
-      super();
-      this.channel = channel;
-      this.lastContact = lastContact;
-      this.data = null;
-      this.dataLengthBuffer = ByteBuffer.allocate(4);
-      this.socket = channel.socket();
-      this.addr = socket.getInetAddress();
-      if (addr == null) {
-        this.hostAddress = "*Unknown*";
-      } else {
-        this.hostAddress = addr.getHostAddress();
-      }
-      this.remotePort = socket.getPort();
-      if (socketSendBufferSize != 0) {
-        try {
-          socket.setSendBufferSize(socketSendBufferSize);
-        } catch (IOException e) {
-          LOG.warn("Connection: unable to set socket send buffer size to " +
-                   socketSendBufferSize);
-        }
-      }
-      this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
-          null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
-      this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
-          null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
-          reservoir, cellBlockBuilder, null, responder);
-      this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null,
-          null, null, this, 0, null, null, System.currentTimeMillis(), 0, reservoir,
-          cellBlockBuilder, null, responder);
-    }
-
-    public void setLastContact(long lastContact) {
-      this.lastContact = lastContact;
-    }
-
-    public long getLastContact() {
-      return lastContact;
-    }
-
-    /* Return true if the connection has no outstanding rpc */
-    private boolean isIdle() {
-      return rpcCount.sum() == 0;
-    }
-
-    /* Decrement the outstanding RPC count */
-    protected void decRpcCount() {
-      rpcCount.decrement();
-    }
-
-    /* Increment the outstanding RPC count */
-    protected void incRpcCount() {
-      rpcCount.increment();
-    }
-
-    private int readPreamble() throws IOException {
-      int count;
-      // Check for 'HBas' magic.
-      this.dataLengthBuffer.flip();
-      if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
-        return doBadPreambleHandling("Expected HEADER=" +
-            Bytes.toStringBinary(HConstants.RPC_HEADER) +
-            " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
-            " from " + toString());
-      }
-      // Now read the next two bytes, the version and the auth to use.
-      ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
-      count = channelRead(channel, versionAndAuthBytes);
-      if (count < 0 || versionAndAuthBytes.remaining() > 0) {
-        return count;
-      }
-      int version = versionAndAuthBytes.get(0);
-      byte authbyte = versionAndAuthBytes.get(1);
-      this.authMethod = AuthMethod.valueOf(authbyte);
-      if (version != CURRENT_VERSION) {
-        String msg = getFatalConnectionString(version, authbyte);
-        return doBadPreambleHandling(msg, new WrongVersionException(msg));
-      }
-      if (authMethod == null) {
-        String msg = getFatalConnectionString(version, authbyte);
-        return doBadPreambleHandling(msg, new BadAuthException(msg));
-      }
-      if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
-        if (allowFallbackToSimpleAuth) {
-          metrics.authenticationFallback();
-          authenticatedWithFallback = true;
-        } else {
-          AccessDeniedException ae = new AccessDeniedException("Authentication is required");
-          setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
-          authFailedCall.sendResponseIfReady();
-          throw ae;
-        }
-      }
-      if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
-        doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
-            SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
-        authMethod = AuthMethod.SIMPLE;
-        // client has already sent the initial Sasl message and we
-        // should ignore it. Both client and server should fall back
-        // to simple auth from now on.
-        skipInitialSaslHandshake = true;
-      }
-      if (authMethod != AuthMethod.SIMPLE) {
-        useSasl = true;
-      }
-
-      dataLengthBuffer.clear();
-      connectionPreambleRead = true;
-      return count;
-    }
-
-    private int read4Bytes() throws IOException {
-      if (this.dataLengthBuffer.remaining() > 0) {
-        return channelRead(channel, this.dataLengthBuffer);
-      } else {
-        return 0;
-      }
-    }
-
-    /**
-     * Read off the wire. If there is not enough data to read, update the connection state with
-     *  what we have and returns.
-     * @return Returns -1 if failure (and caller will close connection), else zero or more.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public int readAndProcess() throws IOException, InterruptedException {
-      // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
-      // does, read in the rest of the connection preamble, the version and the auth method.
-      // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
-      // length into the 4-byte this.dataLengthBuffer.
-      int count = read4Bytes();
-      if (count < 0 || dataLengthBuffer.remaining() > 0) {
-        return count;
-      }
-
-      // If we have not read the connection setup preamble, look to see if that is on the wire.
-      if (!connectionPreambleRead) {
-        count = readPreamble();
-        if (!connectionPreambleRead) {
-          return count;
-        }
-
-        count = read4Bytes();
-        if (count < 0 || dataLengthBuffer.remaining() > 0) {
-          return count;
-        }
-      }
-
-      // We have read a length and we have read the preamble.  It is either the connection header
-      // or it is a request.
-      if (data == null) {
-        dataLengthBuffer.flip();
-        int dataLength = dataLengthBuffer.getInt();
-        if (dataLength == RpcClient.PING_CALL_ID) {
-          if (!useWrap) { //covers the !useSasl too
-            dataLengthBuffer.clear();
-            return 0;  //ping message
-          }
-        }
-        if (dataLength < 0) { // A data length of zero is legal.
-          throw new DoNotRetryIOException("Unexpected data length "
-              + dataLength + "!! from " + getHostAddress());
-        }
-
-        if (dataLength > maxRequestSize) {
-          String msg = "RPC data length of " + dataLength + " received from "
-              + getHostAddress() + " is greater than max allowed "
-              + maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE
-              + "\" on server to override this limit (not recommended)";
-          LOG.warn(msg);
-
-          if (connectionHeaderRead && connectionPreambleRead) {
-            incRpcCount();
-            // Construct InputStream for the non-blocking SocketChannel
-            // We need the InputStream because we want to read only the request header
-            // instead of the whole rpc.
-            ByteBuffer buf = ByteBuffer.allocate(1);
-            InputStream is = new InputStream() {
-              @Override
-              public int read() throws IOException {
-                channelRead(channel, buf);
-                buf.flip();
-                int x = buf.get();
-                buf.flip();
-                return x;
-              }
-            };
-            CodedInputStream cis = CodedInputStream.newInstance(is);
-            int headerSize = cis.readRawVarint32();
-            Message.Builder builder = RequestHeader.newBuilder();
-            ProtobufUtil.mergeFrom(builder, cis, headerSize);
-            RequestHeader header = (RequestHeader) builder.build();
-
-            // Notify the client about the offending request
-            SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service,
-                null, null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0,
-                reservoir, cellBlockBuilder, null, responder);
-            metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
-            // Make sure the client recognizes the underlying exception
-            // Otherwise, throw a DoNotRetryIOException.
-            if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
-                RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
-              setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg);
-            } else {
-              setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
-            }
-            // We are going to close the connection, make sure we process the response
-            // before that. In rare case when this fails, we still close the connection.
-            responseWriteLock.lock();
-            responder.processResponse(reqTooBig);
-            responseWriteLock.unlock();
-          }
-          // Close the connection
-          return -1;
-        }
-
-        // Initialize this.data with a ByteBuff.
-        // This call will allocate a ByteBuff to read request into and assign to this.data
-        // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
-        // assign to this.callCleanup
-        initByteBuffToReadInto(dataLength);
-
-        // Increment the rpc count. This counter will be decreased when we write
-        //  the response.  If we want the connection to be detected as idle properly, we
-        //  need to keep the inc / dec correct.
-        incRpcCount();
-      }
-
-      count = channelDataRead(channel, data);
-
-      if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
-        process();
-      }
-
-      return count;
-    }
-
-    // It creates the ByteBuff and CallCleanup and assign to Connection instance.
-    private void initByteBuffToReadInto(int length) {
-      // We create random on heap buffers are read into those when
-      // 1. ByteBufferPool is not there.
-      // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
-      // waste then. Also if all the reqs are of this size, we will be creating larger sized
-      // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
-      // RegionOpen.
-      // 3. If it is an initial handshake signal or initial connection request. Any way then
-      // condition 2 itself will match
-      // 4. When SASL use is ON.
-      if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl
-          || length < minSizeForReservoirUse) {
-        this.data = new SingleByteBuff(ByteBuffer.allocate(length));
-      } else {
-        Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(reservoir,
-            minSizeForReservoirUse, length);
-        this.data = pair.getFirst();
-        this.callCleanup = pair.getSecond();
-      }
-    }
-
-    protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
-      int count = buf.read(channel);
-      if (count > 0) {
-        metrics.receivedBytes(count);
-      }
-      return count;
-    }
-
-    /**
-     * Process the data buffer and clean the connection state for the next call.
-     */
-    private void process() throws IOException, InterruptedException {
-      data.rewind();
-      try {
-        if (skipInitialSaslHandshake) {
-          skipInitialSaslHandshake = false;
-          return;
-        }
-
-        if (useSasl) {
-          saslReadAndProcess(data);
-        } else {
-          processOneRpc(data);
-        }
-
-      } finally {
-        dataLengthBuffer.clear(); // Clean for the next call
-        data = null; // For the GC
-        this.callCleanup = null;
-      }
-    }
-
-    private int doBadPreambleHandling(final String msg) throws IOException {
-      return doBadPreambleHandling(msg, new FatalConnectionException(msg));
-    }
-
-    private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
-      LOG.warn(msg);
-      SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1,
-          null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
-      setupResponse(null, fakeCall, e, msg);
-      responder.doRespond(fakeCall);
-      // Returning -1 closes out the connection.
-      return -1;
-    }
-
-    @Override
-    public synchronized void close() {
-      disposeSasl();
-      data = null;
-      callCleanup = null;
-      if (!channel.isOpen())
-        return;
-      try {socket.shutdownOutput();} catch(Exception ignored) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Ignored exception", ignored);
-        }
-      }
-      if (channel.isOpen()) {
-        try {channel.close();} catch(Exception ignored) {}
-      }
-      try {
-        socket.close();
-      } catch(Exception ignored) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Ignored exception", ignored);
-        }
-      }
-    }
-
-    @Override
-    public boolean isConnectionOpen() {
-      return channel.isOpen();
-    }
-
-    @Override
-    public ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md,
-        RequestHeader header, Message param, CellScanner cellScanner,
-        RpcServer.Connection connection, long size, TraceInfo tinfo,
-        final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
-      return new SimpleServerCall(id, service, md, header, param, cellScanner, connection, size,
-          tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
-          reqCleanup, responder);
-    }
-  }
-
-
   /**
    * Constructs a server listening on the named port and address.
    * @param server hosting instance of {@link Server}. We will do authentications if an
@@ -1065,7 +392,7 @@ public class SimpleRpcServer extends RpcServer {
     this.port = listener.getAddress().getPort();
 
     // Create the responder here
-    responder = new Responder();
+    responder = new SimpleRpcServerResponder(this);
     connectionManager = new ConnectionManager();
     initReconfigurable(conf);
 
@@ -1076,11 +403,11 @@ public class SimpleRpcServer extends RpcServer {
    * Subclasses of HBaseServer can override this to provide their own
    * Connection implementations.
    */
-  protected Connection getConnection(SocketChannel channel, long time) {
-    return new Connection(channel, time);
+  protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
+    return new SimpleServerRpcConnection(this, channel, time);
   }
 
-  protected void closeConnection(Connection connection) {
+  protected void closeConnection(SimpleServerRpcConnection connection) {
     connectionManager.close(connection);
   }
 
@@ -1228,7 +555,7 @@ public class SimpleRpcServer extends RpcServer {
 
   private class ConnectionManager {
     final private AtomicInteger count = new AtomicInteger();
-    final private Set<Connection> connections;
+    final private Set<SimpleServerRpcConnection> connections;
 
     final private Timer idleScanTimer;
     final private int idleScanThreshold;
@@ -1250,11 +577,11 @@ public class SimpleRpcServer extends RpcServer {
       // create a set with concurrency -and- a thread-safe iterator, add 2
       // for listener and idle closer threads
       this.connections = Collections.newSetFromMap(
-          new ConcurrentHashMap<Connection,Boolean>(
+          new ConcurrentHashMap<SimpleServerRpcConnection,Boolean>(
               maxConnectionQueueSize, 0.75f, readThreads+2));
     }
 
-    private boolean add(Connection connection) {
+    private boolean add(SimpleServerRpcConnection connection) {
       boolean added = connections.add(connection);
       if (added) {
         count.getAndIncrement();
@@ -1262,7 +589,7 @@ public class SimpleRpcServer extends RpcServer {
       return added;
     }
 
-    private boolean remove(Connection connection) {
+    private boolean remove(SimpleServerRpcConnection connection) {
       boolean removed = connections.remove(connection);
       if (removed) {
         count.getAndDecrement();
@@ -1274,12 +601,12 @@ public class SimpleRpcServer extends RpcServer {
       return count.get();
     }
 
-    Connection[] toArray() {
-      return connections.toArray(new Connection[0]);
+    SimpleServerRpcConnection[] toArray() {
+      return connections.toArray(new SimpleServerRpcConnection[0]);
     }
 
-    Connection register(SocketChannel channel) {
-      Connection connection = getConnection(channel, System.currentTimeMillis());
+    SimpleServerRpcConnection register(SocketChannel channel) {
+      SimpleServerRpcConnection connection = getConnection(channel, System.currentTimeMillis());
       add(connection);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Server connection from " + connection +
@@ -1291,7 +618,7 @@ public class SimpleRpcServer extends RpcServer {
       return connection;
     }
 
-    boolean close(Connection connection) {
+    boolean close(SimpleServerRpcConnection connection) {
       boolean exists = remove(connection);
       if (exists) {
         if (LOG.isDebugEnabled()) {
@@ -1314,7 +641,7 @@ public class SimpleRpcServer extends RpcServer {
       // during the iteration, but that's ok because they won't
       // be idle yet anyway and will be caught on next scan
       int closed = 0;
-      for (Connection connection : connections) {
+      for (SimpleServerRpcConnection connection : connections) {
         // stop if connections dropped below threshold unless scanning all
         if (!scanAll && size() < idleScanThreshold) {
           break;
@@ -1332,7 +659,7 @@ public class SimpleRpcServer extends RpcServer {
     void closeAll() {
       // use a copy of the connections to be absolutely sure the concurrent
       // iterator doesn't miss a connection
-      for (Connection connection : toArray()) {
+      for (SimpleServerRpcConnection connection : toArray()) {
         close(connection);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
new file mode 100644
index 0000000..5f072a9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Sends responses of RPC back to clients.
+ */
+@InterfaceAudience.Private
+class SimpleRpcServerResponder extends Thread {
+  /**  */
+  private final SimpleRpcServer simpleRpcServer;
+  private final Selector writeSelector;
+  private final Set<SimpleServerRpcConnection> writingCons =
+      Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>());
+
+  SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
+    this.simpleRpcServer = simpleRpcServer;
+    this.setName("RpcServer.responder");
+    this.setDaemon(true);
+    this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
+    writeSelector = Selector.open(); // create a selector
+  }
+
+  @Override
+  public void run() {
+    SimpleRpcServer.LOG.debug(getName() + ": starting");
+    try {
+      doRunLoop();
+    } finally {
+      SimpleRpcServer.LOG.info(getName() + ": stopping");
+      try {
+        writeSelector.close();
+      } catch (IOException ioe) {
+        SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe);
+      }
+    }
+  }
+
+  /**
+   * Take the list of the connections that want to write, and register them in the selector.
+   */
+  private void registerWrites() {
+    Iterator<SimpleServerRpcConnection> it = writingCons.iterator();
+    while (it.hasNext()) {
+      SimpleServerRpcConnection c = it.next();
+      it.remove();
+      SelectionKey sk = c.channel.keyFor(writeSelector);
+      try {
+        if (sk == null) {
+          try {
+            c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
+          } catch (ClosedChannelException e) {
+            // ignore: the client went away.
+            if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
+          }
+        } else {
+          sk.interestOps(SelectionKey.OP_WRITE);
+        }
+      } catch (CancelledKeyException e) {
+        // ignore: the client went away.
+        if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
+      }
+    }
+  }
+
+  /**
+   * Add a connection to the list that want to write,
+   */
+  public void registerForWrite(SimpleServerRpcConnection c) {
+    if (writingCons.add(c)) {
+      writeSelector.wakeup();
+    }
+  }
+
+  private void doRunLoop() {
+    long lastPurgeTime = 0; // last check for old calls.
+    while (this.simpleRpcServer.running) {
+      try {
+        registerWrites();
+        int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout);
+        if (keyCt == 0) {
+          continue;
+        }
+
+        Set<SelectionKey> keys = writeSelector.selectedKeys();
+        Iterator<SelectionKey> iter = keys.iterator();
+        while (iter.hasNext()) {
+          SelectionKey key = iter.next();
+          iter.remove();
+          try {
+            if (key.isValid() && key.isWritable()) {
+              doAsyncWrite(key);
+            }
+          } catch (IOException e) {
+            SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e);
+          }
+        }
+
+        lastPurgeTime = purge(lastPurgeTime);
+
+      } catch (OutOfMemoryError e) {
+        if (this.simpleRpcServer.errorHandler != null) {
+          if (this.simpleRpcServer.errorHandler.checkOOME(e)) {
+            SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError");
+            return;
+          }
+        } else {
+          //
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give
+          // some thread(s) a chance to finish
+          //
+          SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e);
+          try {
+            Thread.sleep(60000);
+          } catch (InterruptedException ex) {
+            SimpleRpcServer.LOG.debug("Interrupted while sleeping");
+            return;
+          }
+        }
+      } catch (Exception e) {
+        SimpleRpcServer.LOG
+            .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e);
+      }
+    }
+    SimpleRpcServer.LOG.info(getName() + ": stopped");
+  }
+
+  /**
+   * If there were some calls that have not been sent out for a long time, we close the connection.
+   * @return the time of the purge.
+   */
+  private long purge(long lastPurgeTime) {
+    long now = System.currentTimeMillis();
+    if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) {
+      return lastPurgeTime;
+    }
+
+    ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>();
+    // get the list of channels from list of keys.
+    synchronized (writeSelector.keys()) {
+      for (SelectionKey key : writeSelector.keys()) {
+        SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
+        if (connection == null) {
+          throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
+        }
+        SimpleServerCall call = connection.responseQueue.peekFirst();
+        if (call != null && now > call.lastSentTime + this.simpleRpcServer.purgeTimeout) {
+          conWithOldCalls.add(call.getConnection());
+        }
+      }
+    }
+
+    // Seems safer to close the connection outside of the synchronized loop...
+    for (SimpleServerRpcConnection connection : conWithOldCalls) {
+      this.simpleRpcServer.closeConnection(connection);
+    }
+
+    return now;
+  }
+
+  private void doAsyncWrite(SelectionKey key) throws IOException {
+    SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
+    if (connection == null) {
+      throw new IOException("doAsyncWrite: no connection");
+    }
+    if (key.channel() != connection.channel) {
+      throw new IOException("doAsyncWrite: bad channel");
+    }
+
+    if (processAllResponses(connection)) {
+      try {
+        // We wrote everything, so we don't need to be told when the socket is ready for
+        // write anymore.
+        key.interestOps(0);
+      } catch (CancelledKeyException e) {
+        /*
+         * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so
+         * not sure if this will ever fire. This warning could be removed.
+         */
+        SimpleRpcServer.LOG.warn("Exception while changing ops : " + e);
+      }
+    }
+  }
+
+  /**
+   * Process the response for this call. You need to have the lock on
+   * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
+   * @param call the call
+   * @return true if we proceed the call fully, false otherwise.
+   * @throws IOException
+   */
+  boolean processResponse(final SimpleServerCall call) throws IOException {
+    boolean error = true;
+    try {
+      // Send as much data as we can in the non-blocking fashion
+      long numBytes =
+          this.simpleRpcServer.channelWrite(call.getConnection().channel, call.response);
+      if (numBytes < 0) {
+        throw new HBaseIOException(
+            "Error writing on the socket " + "for the call:" + call.toShortString());
+      }
+      error = false;
+    } finally {
+      if (error) {
+        SimpleRpcServer.LOG.debug(getName() + call.toShortString() + ": output error -- closing");
+        // We will be closing this connection itself. Mark this call as done so that all the
+        // buffer(s) it got from pool can get released
+        call.done();
+        this.simpleRpcServer.closeConnection(call.getConnection());
+      }
+    }
+
+    if (!call.response.hasRemaining()) {
+      call.done();
+      return true;
+    } else {
+      return false; // Socket can't take more, we will have to come back.
+    }
+  }
+
+  /**
+   * Process all the responses for this connection
+   * @return true if all the calls were processed or that someone else is doing it. false if there *
+   *         is still some work to do. In this case, we expect the caller to delay us.
+   * @throws IOException
+   */
+  private boolean processAllResponses(final SimpleServerRpcConnection connection)
+      throws IOException {
+    // We want only one writer on the channel for a connection at a time.
+    connection.responseWriteLock.lock();
+    try {
+      for (int i = 0; i < 20; i++) {
+        // protection if some handlers manage to need all the responder
+        SimpleServerCall call = connection.responseQueue.pollFirst();
+        if (call == null) {
+          return true;
+        }
+        if (!processResponse(call)) {
+          connection.responseQueue.addFirst(call);
+          return false;
+        }
+      }
+    } finally {
+      connection.responseWriteLock.unlock();
+    }
+
+    return connection.responseQueue.isEmpty();
+  }
+
+  //
+  // Enqueue a response from the application.
+  //
+  void doRespond(SimpleServerCall call) throws IOException {
+    boolean added = false;
+
+    // If there is already a write in progress, we don't wait. This allows to free the handlers
+    // immediately for other tasks.
+    if (call.getConnection().responseQueue.isEmpty() &&
+        call.getConnection().responseWriteLock.tryLock()) {
+      try {
+        if (call.getConnection().responseQueue.isEmpty()) {
+          // If we're alone, we can try to do a direct call to the socket. It's
+          // an optimisation to save on context switches and data transfer between cores..
+          if (processResponse(call)) {
+            return; // we're done.
+          }
+          // Too big to fit, putting ahead.
+          call.getConnection().responseQueue.addFirst(call);
+          added = true; // We will register to the selector later, outside of the lock.
+        }
+      } finally {
+        call.getConnection().responseWriteLock.unlock();
+      }
+    }
+
+    if (!added) {
+      call.getConnection().responseQueue.addLast(call);
+    }
+    call.responder.registerForWrite(call.getConnection());
+
+    // set the serve time when the response has to be sent later
+    call.lastSentTime = System.currentTimeMillis();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
index b82d348..af575ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
-import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection;
-import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Responder;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
@@ -37,19 +35,19 @@ import org.apache.htrace.TraceInfo;
  * result.
  */
 @InterfaceAudience.Private
-class SimpleServerCall extends ServerCall {
+class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
 
   long lastSentTime;
 
-  final Responder responder;
+  final SimpleRpcServerResponder responder;
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
       justification = "Can't figure why this complaint is happening... see below")
   SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
-      RequestHeader header, Message param, CellScanner cellScanner, RpcServer.Connection connection,
-      long size, TraceInfo tinfo, final InetAddress remoteAddress, long receiveTime, int timeout,
-      ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
-      Responder responder) {
+      RequestHeader header, Message param, CellScanner cellScanner,
+      SimpleServerRpcConnection connection, long size, TraceInfo tinfo,
+      final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
+      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) {
     super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
         receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
     this.responder = responder;
@@ -73,7 +71,7 @@ class SimpleServerCall extends ServerCall {
     this.responder.doRespond(this);
   }
 
-  Connection getConnection() {
-    return (Connection) this.connection;
+  SimpleServerRpcConnection getConnection() {
+    return (SimpleServerRpcConnection) this.connection;
   }
 }
\ No newline at end of file


Mime
View raw message