hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [4/4] hbase git commit: HBASE-12597 Add RpcClient interface and enable changing of RpcClient implementation (Jurriaan Mous)
Date Mon, 08 Dec 2014 23:23:58 GMT
HBASE-12597 Add RpcClient interface and enable changing of RpcClient implementation (Jurriaan
Mous)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1d880e3f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1d880e3f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1d880e3f

Branch: refs/heads/branch-1
Commit: 1d880e3f60989437ab10ecc285037b32bc694464
Parents: 823656b
Author: stack <stack@apache.org>
Authored: Mon Dec 8 15:23:38 2014 -0800
Committer: stack <stack@apache.org>
Committed: Mon Dec 8 15:23:38 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ConnectionManager.java  |   19 +-
 .../client/PreemptiveFastFailInterceptor.java   |    2 +-
 .../hadoop/hbase/ipc/AbstractRpcClient.java     |  177 ++
 .../java/org/apache/hadoop/hbase/ipc/Call.java  |  127 ++
 .../hadoop/hbase/ipc/CallTimeoutException.java  |   35 +
 .../apache/hadoop/hbase/ipc/ConnectionId.java   |   78 +
 .../hadoop/hbase/ipc/FailedServerException.java |   37 +
 .../apache/hadoop/hbase/ipc/FailedServers.java  |   79 +
 .../org/apache/hadoop/hbase/ipc/IPCUtil.java    |   22 +-
 .../org/apache/hadoop/hbase/ipc/RpcClient.java  | 1745 +-----------------
 .../hadoop/hbase/ipc/RpcClientFactory.java      |   70 +
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  | 1383 ++++++++++++++
 .../hbase/zookeeper/MetaTableLocator.java       |    4 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |    2 +-
 .../hadoop/hbase/master/AssignmentManager.java  |    2 +-
 .../hbase/regionserver/HRegionServer.java       |   12 +-
 .../client/TestClientScannerRPCTimeout.java     |    4 +-
 .../hadoop/hbase/client/TestClientTimeouts.java |   52 +-
 .../hbase/client/TestFromClientSideNoCodec.java |    6 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |    2 +-
 .../hbase/filter/FilterTestingCluster.java      |    4 +-
 .../apache/hadoop/hbase/ipc/TestDelayedRpc.java |   15 +-
 .../hadoop/hbase/ipc/TestHBaseClient.java       |    2 +-
 .../org/apache/hadoop/hbase/ipc/TestIPC.java    |   20 +-
 .../hadoop/hbase/ipc/TestProtoBufRpc.java       |    4 +-
 .../hbase/master/TestHMasterRPCException.java   |    5 +-
 .../hadoop/hbase/security/TestSecureRPC.java    |    8 +-
 .../security/token/TestTokenAuthentication.java |    5 +-
 .../snapshot/TestFlushSnapshotFromClient.java   |    3 +-
 29 files changed, 2115 insertions(+), 1809 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 1026d86..df96274 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -615,7 +616,7 @@ class ConnectionManager {
       this.registry = setupRegistry();
       retrieveClusterId();
 
-      this.rpcClient = new RpcClient(this.conf, this.clusterId);
+      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
 
       // Do we publish the status?
@@ -635,7 +636,7 @@ class ConnectionManager {
                 @Override
                 public void newDead(ServerName sn) {
                   clearCaches(sn);
-                  rpcClient.cancelConnections(sn.getHostname(), sn.getPort());
+                  rpcClient.cancelConnections(sn);
                 }
               }, conf, listenerClass);
         }
@@ -781,18 +782,6 @@ class ConnectionManager {
 
     /**
      * For tests only.
-     * @param rpcClient Client we should use instead.
-     * @return Previous rpcClient
-     */
-    @VisibleForTesting
-    RpcClient setRpcClient(final RpcClient rpcClient) {
-      RpcClient oldRpcClient = this.rpcClient;
-      this.rpcClient = rpcClient;
-      return oldRpcClient;
-    }
-
-    /**
-     * For tests only.
      */
     @VisibleForTesting
     RpcClient getRpcClient() {
@@ -2318,7 +2307,7 @@ class ConnectionManager {
         clusterStatusListener.close();
       }
       if (rpcClient != null) {
-        rpcClient.stop();
+        rpcClient.close();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
index 4256120..6fb2de3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
-import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
+import org.apache.hadoop.hbase.ipc.FailedServerException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.ipc.RemoteException;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
new file mode 100644
index 0000000..766ad8f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import java.net.SocketAddress;
+
+/**
+ * Provides the basics for a RpcClient implementation like configuration and Logging.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractRpcClient implements RpcClient {
+  public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
+
+  protected final Configuration conf;
+  protected String clusterId;
+  protected final SocketAddress localAddr;
+
+  protected UserProvider userProvider;
+  protected final IPCUtil ipcUtil;
+
+  protected final int minIdleTimeBeforeClose; // if the connection is idle for more than
this
+  // time (in ms), it will be closed at any moment.
+  protected final int maxRetries; //the max. no. of retries for socket connections
+  protected final long failureSleep; // Time to sleep before retry on failure.
+  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+  protected final boolean tcpKeepAlive; // if T then use keepalives
+  protected final Codec codec;
+  protected final CompressionCodec compressor;
+  protected final boolean fallbackAllowed;
+
+  protected final int connectTO;
+  protected final int readTO;
+  protected final int writeTO;
+
+  /**
+   * Construct an IPC client for the cluster <code>clusterId</code>
+   *
+   * @param conf configuration
+   * @param clusterId the cluster id
+   * @param localAddr client socket bind address.
+   */
+  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr)
{
+    this.userProvider = UserProvider.instantiate(conf);
+    this.localAddr = localAddr;
+    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
+    this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
+    this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+    this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
+    this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
+    this.ipcUtil = new IPCUtil(conf);
+
+    this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
+    this.conf = conf;
+    this.codec = getCodec();
+    this.compressor = getCompressor(conf);
+    this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+    this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
+    this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
+    this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
+
+    // login the server principal (if using secure Hadoop)
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
+          ", tcpKeepAlive=" + this.tcpKeepAlive +
+          ", tcpNoDelay=" + this.tcpNoDelay +
+          ", connectTO=" + this.connectTO +
+          ", readTO=" + this.readTO +
+          ", writeTO=" + this.writeTO +
+          ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
+          ", maxRetries=" + this.maxRetries +
+          ", fallbackAllowed=" + this.fallbackAllowed +
+          ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
+    }
+  }
+
+  @VisibleForTesting
+  public static String getDefaultCodec(final Configuration c) {
+    // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
+    // Configuration will complain -- then no default codec (and we'll pb everything).  Else
+    // default is KeyValueCodec
+    return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
+  }
+
+  /**
+   * Encapsulate the ugly casting and RuntimeException conversion in private method.
+   * @return Codec to use on this client.
+   */
+  Codec getCodec() {
+    // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
+    // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
+    String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
+    if (className == null || className.length() == 0) return null;
+    try {
+      return (Codec)Class.forName(className).newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed getting codec " + className, e);
+    }
+  }
+
+  /**
+   * Encapsulate the ugly casting and RuntimeException conversion in private method.
+   * @param conf configuration
+   * @return The compressor to use on this client.
+   */
+  private static CompressionCodec getCompressor(final Configuration conf) {
+    String className = conf.get("hbase.client.rpc.compressor", null);
+    if (className == null || className.isEmpty()) return null;
+    try {
+        return (CompressionCodec)Class.forName(className).newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed getting compressor " + className, e);
+    }
+  }
+
+  /**
+   * Return the pool type specified in the configuration, which must be set to
+   * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
+   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
+   * otherwise default to the former.
+   *
+   * For applications with many user threads, use a small round-robin pool. For
+   * applications with few user threads, you may want to try using a
+   * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
+   * instances should not exceed the operating system's hard limit on the number of
+   * connections.
+   *
+   * @param config configuration
+   * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
+   *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
+   */
+  protected static PoolMap.PoolType getPoolType(Configuration config) {
+    return PoolMap.PoolType
+        .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
+            PoolMap.PoolType.ThreadLocal);
+  }
+
+  /**
+   * Return the pool size specified in the configuration, which is applicable only if
+   * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
+   *
+   * @param config configuration
+   * @return the maximum pool size
+   */
+  protected static int getPoolSize(Configuration config) {
+    return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
new file mode 100644
index 0000000..df32730
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+
+/** A call waiting for a value. */
+@InterfaceAudience.Private
+public class Call {
+  final int id;                                 // call id
+  final Message param;                          // rpc request method param object
+  /**
+   * Optionally has cells when making call.  Optionally has cells set on response.  Used
+   * passing cells to the rpc and receiving the response.
+   */
+  CellScanner cells;
+  Message response;                             // value, null if error
+  // The return type.  Used to create shell into which we deserialize the response if any.
+  Message responseDefaultType;
+  IOException error;                            // exception, null if value
+  volatile boolean done;                                 // true when call is done
+  long startTime;
+  final Descriptors.MethodDescriptor md;
+  final int timeout; // timeout in millisecond for this call; 0 means infinite.
+
+  protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
+      final CellScanner cells, final Message responseDefaultType, int timeout) {
+    this.param = param;
+    this.md = md;
+    this.cells = cells;
+    this.startTime = EnvironmentEdgeManager.currentTime();
+    this.responseDefaultType = responseDefaultType;
+    this.id = id;
+    this.timeout = timeout;
+  }
+
+  /**
+   * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
+   * @return true if the call is on timeout, false otherwise.
+   */
+  public boolean checkAndSetTimeout() {
+    if (timeout == 0){
+      return false;
+    }
+
+    long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime();
+    if (waitTime >= timeout) {
+      IOException ie = new CallTimeoutException("Call id=" + id +
+          ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
+      setException(ie); // includes a notify
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public int remainingTime() {
+    if (timeout == 0) {
+      return Integer.MAX_VALUE;
+    }
+
+    int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime());
+    return remaining > 0 ? remaining : 0;
+  }
+
+  @Override
+  public String toString() {
+    return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
+      (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
+  }
+
+  /** Indicate when the call is complete and the
+   * value or error are available.  Notifies by default.  */
+  protected synchronized void callComplete() {
+    this.done = true;
+    notify();                                 // notify caller
+  }
+
+  /** Set the exception when there is an error.
+   * Notify the caller the call is done.
+   *
+   * @param error exception thrown by the call; either local or remote
+   */
+  public void setException(IOException error) {
+    this.error = error;
+    callComplete();
+  }
+
+  /**
+   * Set the return value when there is no error.
+   * Notify the caller the call is done.
+   *
+   * @param response return value of the call.
+   * @param cells Can be null
+   */
+  public void setResponse(Message response, final CellScanner cells) {
+    this.response = response;
+    this.cells = cells;
+    callComplete();
+  }
+
+  public long getStartTime() {
+    return this.startTime;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
new file mode 100644
index 0000000..a81e5d1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import java.io.IOException;
+
+/**
+ * Client-side call timeout
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CallTimeoutException extends IOException {
+  public CallTimeoutException(final String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
new file mode 100644
index 0000000..a62d415
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.security.User;
+
+import java.net.InetSocketAddress;
+
+/**
+ * This class holds the address and the user ticket, etc. The client connections
+ * to servers are uniquely identified by <remoteAddress, ticket, serviceName>
+ */
+@InterfaceAudience.Private
+public class ConnectionId {
+  final InetSocketAddress address;
+  final User ticket;
+  private static final int PRIME = 16777619;
+  final String serviceName;
+
+  public ConnectionId(User ticket, String serviceName, InetSocketAddress address) {
+    this.address = address;
+    this.ticket = ticket;
+    this.serviceName = serviceName;
+  }
+
+  public String getServiceName() {
+    return this.serviceName;
+  }
+
+  public InetSocketAddress getAddress() {
+    return address;
+  }
+
+  public User getTicket() {
+    return ticket;
+  }
+
+  @Override
+  public String toString() {
+    return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+   if (obj instanceof ConnectionId) {
+     ConnectionId id = (ConnectionId) obj;
+     return address.equals(id.address) &&
+            ((ticket != null && ticket.equals(id.ticket)) ||
+             (ticket == id.ticket)) &&
+             this.serviceName == id.serviceName;
+   }
+   return false;
+  }
+
+  @Override  // simply use the default Object#hashcode() ?
+  public int hashCode() {
+    int hashcode = (address.hashCode() +
+      PRIME * (PRIME * this.serviceName.hashCode() ^
+      (ticket == null ? 0 : ticket.hashCode())));
+    return hashcode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java
new file mode 100644
index 0000000..12f6451
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Indicates that we're trying to connect to a already known as dead server. We will want
to
+ *  retry: we're getting this because the region location was wrong, or because
+ *  the server just died, in which case the retry loop will help us to wait for the
+ *  regions to recover.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FailedServerException extends HBaseIOException {
+  public FailedServerException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java
new file mode 100644
index 0000000..16ec16c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * A class to manage a list of servers that failed recently.
+ */
+@InterfaceAudience.Private
+public class FailedServers {
+  private final LinkedList<Pair<Long, String>> failedServers = new
+      LinkedList<Pair<Long, String>>();
+  private final int recheckServersTimeout;
+
+  public FailedServers(Configuration conf) {
+    this.recheckServersTimeout = conf.getInt(
+        RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+  }
+
+  /**
+   * Add an address to the list of the failed servers list.
+   */
+  public synchronized void addToFailedServers(InetSocketAddress address) {
+    final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout;
+    failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
+  }
+
+  /**
+   * Check if the server should be considered as bad. Clean the old entries of the list.
+   *
+   * @return true if the server is in the failed servers list
+   */
+  public synchronized boolean isFailedServer(final InetSocketAddress address) {
+    if (failedServers.isEmpty()) {
+      return false;
+    }
+
+    final String lookup = address.toString();
+    final long now = EnvironmentEdgeManager.currentTime();
+
+    // iterate, looking for the search entry and cleaning expired entries
+    Iterator<Pair<Long, String>> it = failedServers.iterator();
+    while (it.hasNext()) {
+      Pair<Long, String> cur = it.next();
+      if (cur.getFirst() < now) {
+        it.remove();
+      } else {
+        if (lookup.equals(cur.getSecond())) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 67e2524..b7e7728 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -51,7 +51,7 @@ import com.google.protobuf.Message;
  * Utility to help ipc'ing.
  */
 @InterfaceAudience.Private
-class IPCUtil {
+public class IPCUtil {
   public static final Log LOG = LogFactory.getLog(IPCUtil.class);
   /**
    * How much we think the decompressor will expand the original compressed content.
@@ -60,7 +60,7 @@ class IPCUtil {
   private final int cellBlockBuildingInitialBufferSize;
   private final Configuration conf;
 
-  IPCUtil(final Configuration conf) {
+  public IPCUtil(final Configuration conf) {
     super();
     this.conf = conf;
     this.cellBlockDecompressionMultiplier =
@@ -81,14 +81,14 @@ class IPCUtil {
    * <code>compressor</code>.
    * @param codec
    * @param compressor
-   * @Param cellScanner
+   * @param cellScanner
    * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded
using
    * passed in <code>codec</code> and/or <code>compressor</code>;
the returned buffer has been
    * flipped and is ready for reading.  Use limit to find total size.
    * @throws IOException
    */
   @SuppressWarnings("resource")
-  ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
     final CellScanner cellScanner)
   throws IOException {
     if (cellScanner == null) return null;
@@ -145,7 +145,7 @@ class IPCUtil {
    * @return CellScanner to work against the content of <code>cellBlock</code>
    * @throws IOException
    */
-  CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+  public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
       final byte [] cellBlock)
   throws IOException {
     return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
@@ -159,7 +159,7 @@ class IPCUtil {
    * @return CellScanner to work against the content of <code>cellBlock</code>
    * @throws IOException
    */
-  CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+  public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
       final byte [] cellBlock, final int offset, final int length)
   throws IOException {
     // If compressed, decompress it first before passing it on else we will leak compression
@@ -200,7 +200,7 @@ class IPCUtil {
    * @return The passed in Message serialized with delimiter.  Return null if <code>m</code>
is null
    * @throws IOException
    */
-  static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
+  public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException
{
     if (m == null) return null;
     int serializedSize = m.getSerializedSize();
     int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
@@ -223,7 +223,7 @@ class IPCUtil {
    * @return Total number of bytes written.
    * @throws IOException
    */
-  static int write(final OutputStream dos, final Message header, final Message param,
+  public static int write(final OutputStream dos, final Message header, final Message param,
       final ByteBuffer cellBlock)
   throws IOException {
     // Must calculate total size and write that first so other side can read it all in in
one
@@ -255,7 +255,7 @@ class IPCUtil {
    * @param len
    * @throws IOException
    */
-  static void readChunked(final DataInput in, byte[] dest, int offset, int len)
+  public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
       throws IOException {
     int maxRead = 8192;
 
@@ -265,11 +265,9 @@ class IPCUtil {
   }
 
   /**
-   * @param header
-   * @param body
    * @return Size on the wire when the two messages are written with writeDelimitedTo
    */
-  static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
+  public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
     int totalSize = 0;
     for (Message m: messages) {
       if (m == null) continue;


Mime
View raw message